JavaRush /Курсы /JAVA 25 SELF /ForkJoinPool и RecursiveTask: рекурсивные задачи

ForkJoinPool и RecursiveTask: рекурсивные задачи

JAVA 25 SELF
54 уровень , 3 лекция
Открыта

1. ForkJoinPool: что это такое и зачем он нужен

ForkJoinPool — это специальный пул потоков, реализующий подход «разделяй и властвуй» (divide and conquer). Его задача — максимально эффективно распараллеливать работу, когда большую задачу можно разбить на ряд независимых подзадач, выполнить их параллельно, а затем объединить результаты.

  • Fork (разделить) — задача делится на подзадачи.
  • Join (объединить) — результаты подзадач собираются в итог.

ForkJoinPool — это сердце параллельных стримов в Java: когда вы пишете list.parallelStream(), внутри используется именно он. Но вы можете применять его и напрямую, получая больше контроля.

Когда ForkJoinPool особенно полезен

ForkJoinPool раскрывается на задачах, которые легко дробятся на независимые части: например, обработка очень больших массивов, когда каждый фрагмент обрабатывается отдельно, а затем результаты объединяются.

  • Задачу легко разбить на независимые подзадачи: сортировка, поиск, суммирование.
  • Подзадачи примерно одинаковы по объёму и не зависят друг от друга.
  • Нужно задействовать все ядра процессора для максимальной скорости.
+---------------------+
|   Большая задача    |
+---------------------+
          |
          v
+---------+---------+
|  Подзадача 1      |
|  Подзадача 2      |
|  ...              |
+-------------------+
          |
          v
+---------+---------+
|  Результаты       |
+-------------------+

Именно так работает «разделяй и властвуй»: разбили — посчитали параллельно — объединили.

2. RecursiveTask и RecursiveAction: две стороны одной медали

В ForkJoinPool задачи оформляются через специальные классы, умеющие дробиться на подзадачи и объединять результаты. RecursiveTask<T> возвращает результат, а RecursiveAction — нет. На практике чаще используют RecursiveTask, чтобы, например, вернуть сумму, максимум или количество.

Чтобы создать такую задачу, наследуемся и реализуем метод compute(). В нём описываем логику: если задача маленькая — решаем сразу; если большая — делим на подзадачи, запускаем их параллельно через fork() и объединяем результаты с помощью join(). Так рождается естественный рекурсивный параллелизм.

3. Синтаксис и пример: параллельное вычисление суммы массива

Пусть у нас есть большой массив чисел, и мы хотим быстро посчитать сумму всех элементов.

Шаг 1. Класс задачи

import java.util.concurrent.RecursiveTask;

public class ArraySumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 1_000; // Порог для деления задачи
    private final int[] array;
    private final int start, end;

    public ArraySumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        // Если задача маленькая — считаем напрямую
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            // Делим задачу на две подзадачи
            int mid = (start + end) / 2;
            ArraySumTask leftTask = new ArraySumTask(array, start, mid);
            ArraySumTask rightTask = new ArraySumTask(array, mid, end);

            // Запускаем подзадачи параллельно
            leftTask.fork(); // Асинхронно
            long rightResult = rightTask.compute(); // Синхронно
            long leftResult = leftTask.join(); // Ждём завершения левой

            // Объединяем результат
            return leftResult + rightResult;
        }
    }
}
  • Если задача маленькая (меньше порога THRESHOLD) — считаем сумму обычным циклом.
  • Если большая — делим на две, одну запускаем асинхронно через fork(), вторую считаем синхронно через compute(), затем объединяем через join().

Шаг 2. Запуск задачи через ForkJoinPool

import java.util.concurrent.ForkJoinPool;

public class ForkJoinSumDemo {
    public static void main(String[] args) {
        int[] numbers = new int[10_000_000];
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = 1; // Для простоты — сумма должна быть равна длине массива
        }

        ForkJoinPool pool = new ForkJoinPool(); // По умолчанию — по числу ядер

        ArraySumTask task = new ArraySumTask(numbers, 0, numbers.length);

        long result = pool.invoke(task); // Запуск задачи

        System.out.println("Сумма элементов массива: " + result);
    }
}

Как это работает?

  • ForkJoinPool сам решает, сколько потоков использовать (обычно — по числу ядер).
  • Задача автоматически делится на подзадачи, каждая из которых может исполняться на отдельном ядре.
  • Производительность обычно выше, чем у последовательного кода (особенно на больших данных и многоядерных системах).

4. Как работает ForkJoinPool: немного «под капотом»

Work-Stealing (воровство работы)

ForkJoinPool реализует «воровство работы»: если у какого-то потока закончились задачи, он «крадёт» работу у другого потока. Это обеспечивает эффективную балансировку нагрузки и загрузку всех ядер.

Базовый алгоритм

  • Главная задача делится на подзадачи.
  • Подзадачи помещаются в специализированные очереди.
  • Потоки берут задачи из своих очередей, а при их опустошении — «ищут» работу у соседей.
  • Когда всё выполнено, результаты объединяются.

Схема работы

flowchart TD A[Главная задача] --> B1[Подзадача 1] A --> B2[Подзадача 2] B1 --> C1[Маленькая задача 1] B1 --> C2[Маленькая задача 2] B2 --> C3[Маленькая задача 3] B2 --> C4[Маленькая задача 4] C1 --> D[Объединение результатов] C2 --> D C3 --> D C4 --> D

5. RecursiveAction — если не нужно возвращать результат

Если нужно просто что-то выполнить параллельно и не возвращать результат, используйте RecursiveAction. Типичные примеры — распараллелить заполнение массива, печать, сортировку «на месте» и т.п.

import java.util.concurrent.RecursiveAction;

public class PrintTask extends RecursiveAction {
    private static final int THRESHOLD = 100;
    private final int[] array;
    private final int start, end;

    public PrintTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if (end - start <= THRESHOLD) {
            for (int i = start; i < end; i++) {
                System.out.print(array[i] + " ");
            }
        } else {
            int mid = (start + end) / 2;
            invokeAll(
                new PrintTask(array, start, mid),
                new PrintTask(array, mid, end)
            );
        }
    }
}

6. Практика: параллельный поиск максимального значения в массиве

import java.util.concurrent.RecursiveTask;

public class MaxFindTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 1000;
    private final int[] array;
    private final int start, end;

    public MaxFindTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start <= THRESHOLD) {
            int max = array[start];
            for (int i = start + 1; i < end; i++) {
                if (array[i] > max) max = array[i];
            }
            return max;
        } else {
            int mid = (start + end) / 2;
            MaxFindTask left = new MaxFindTask(array, start, mid);
            MaxFindTask right = new MaxFindTask(array, mid, end);
            left.fork();
            int rightResult = right.compute();
            int leftResult = left.join();
            return Math.max(leftResult, rightResult);
        }
    }
}

Запуск:

import java.util.concurrent.ForkJoinPool;

public class ForkJoinMaxDemo {
    public static void main(String[] args) {
        int[] array = new int[5_000_000];
        for (int i = 0; i < array.length; i++) {
            array[i] = (int)(Math.random() * 1_000_000);
        }

        ForkJoinPool pool = new ForkJoinPool();
        MaxFindTask task = new MaxFindTask(array, 0, array.length);

        int max = pool.invoke(task);

        System.out.println("Максимальное значение: " + max);
    }
}

7. Преимущества и ограничения ForkJoinPool

Преимущества

  • Автоматическая балансировка нагрузки. Work-stealing позволяет эффективно использовать все ядра.
  • Удобство. Не нужно вручную создавать и управлять потоками.
  • Высокая производительность. Особенно на больших задачах и многоядерных системах.
  • Гибкость. Можно дробить задачи на столько частей, сколько нужно.

Ограничения

  • Сильная связность между подзадачами. Если подзадачи часто ждут друг друга, выгода снижается.
  • Слишком мелкие задачи. Накладные расходы на деление/синхронизацию могут «съесть» преимущество.
  • Побочные эффекты. Нельзя изменять общие переменные без синхронизации — получите race condition.
  • Применимость. Хорош для задач, дробимых на независимые части.

8. Типичные ошибки при работе с ForkJoinPool и RecursiveTask

Ошибка №1: Слишком мелкое деление задачи. Если порог (THRESHOLD) слишком маленький, получится много крошечных задач — затраты на их создание и синхронизацию превысят выигрыш от параллелизма. Экспериментируйте с порогом: оптимальные значения часто в тысячах или десятках тысяч элементов.

Ошибка №2: Использование общих изменяемых переменных. Если подзадачи пишут в общую переменную без синхронизации — получите гонки данных (race condition). Возвращайте результат через compute() и объединяйте только в join().

Ошибка №3: Неправильное использование fork/join. Забыли вызвать fork() или join() — и подзадача не запустится параллельно или результат «потеряется». Внимательно следите за порядком вызовов.

Ошибка №4: Запуск ForkJoinTask вне ForkJoinPool. Если просто вызвать compute() у задачи, она выполнится в текущем потоке, без параллелизма. Для настоящей магии используйте pool.invoke() или pool.submit().

Ошибка №5: Игнорирование исключений. Если в задаче произошло исключение, оно проявится при вызове join() или invoke(). Не забывайте обрабатывать ошибки.

Ошибка №6: Использование ForkJoinPool для задач с блокировками. ForkJoinPool плохо подходит для задач, которые часто блокируются (ожидают I/O и т.д.). В таких случаях лучше использовать ExecutorService.

1
Задача
JAVA 25 SELF, 54 уровень, 3 лекция
Недоступна
Распределение задач для цифровых бухгалтеров 💼
Распределение задач для цифровых бухгалтеров 💼
1
Задача
JAVA 25 SELF, 54 уровень, 3 лекция
Недоступна
Разгадка криптографического кода 🕵️‍♀️
Разгадка криптографического кода 🕵️‍♀️
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ