1. ForkJoinPool: что это такое и зачем он нужен
ForkJoinPool — это специальный пул потоков, реализующий подход «разделяй и властвуй» (divide and conquer). Его задача — максимально эффективно распараллеливать работу, когда большую задачу можно разбить на ряд независимых подзадач, выполнить их параллельно, а затем объединить результаты.
- Fork (разделить) — задача делится на подзадачи.
- Join (объединить) — результаты подзадач собираются в итог.
ForkJoinPool — это сердце параллельных стримов в Java: когда вы пишете list.parallelStream(), внутри используется именно он. Но вы можете применять его и напрямую, получая больше контроля.
Когда ForkJoinPool особенно полезен
ForkJoinPool раскрывается на задачах, которые легко дробятся на независимые части: например, обработка очень больших массивов, когда каждый фрагмент обрабатывается отдельно, а затем результаты объединяются.
- Задачу легко разбить на независимые подзадачи: сортировка, поиск, суммирование.
- Подзадачи примерно одинаковы по объёму и не зависят друг от друга.
- Нужно задействовать все ядра процессора для максимальной скорости.
+---------------------+
| Большая задача |
+---------------------+
|
v
+---------+---------+
| Подзадача 1 |
| Подзадача 2 |
| ... |
+-------------------+
|
v
+---------+---------+
| Результаты |
+-------------------+
Именно так работает «разделяй и властвуй»: разбили — посчитали параллельно — объединили.
2. RecursiveTask и RecursiveAction: две стороны одной медали
В ForkJoinPool задачи оформляются через специальные классы, умеющие дробиться на подзадачи и объединять результаты. RecursiveTask<T> возвращает результат, а RecursiveAction — нет. На практике чаще используют RecursiveTask, чтобы, например, вернуть сумму, максимум или количество.
Чтобы создать такую задачу, наследуемся и реализуем метод compute(). В нём описываем логику: если задача маленькая — решаем сразу; если большая — делим на подзадачи, запускаем их параллельно через fork() и объединяем результаты с помощью join(). Так рождается естественный рекурсивный параллелизм.
3. Синтаксис и пример: параллельное вычисление суммы массива
Пусть у нас есть большой массив чисел, и мы хотим быстро посчитать сумму всех элементов.
Шаг 1. Класс задачи
import java.util.concurrent.RecursiveTask;
public class ArraySumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 1_000; // Порог для деления задачи
private final int[] array;
private final int start, end;
public ArraySumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// Если задача маленькая — считаем напрямую
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// Делим задачу на две подзадачи
int mid = (start + end) / 2;
ArraySumTask leftTask = new ArraySumTask(array, start, mid);
ArraySumTask rightTask = new ArraySumTask(array, mid, end);
// Запускаем подзадачи параллельно
leftTask.fork(); // Асинхронно
long rightResult = rightTask.compute(); // Синхронно
long leftResult = leftTask.join(); // Ждём завершения левой
// Объединяем результат
return leftResult + rightResult;
}
}
}
- Если задача маленькая (меньше порога THRESHOLD) — считаем сумму обычным циклом.
- Если большая — делим на две, одну запускаем асинхронно через fork(), вторую считаем синхронно через compute(), затем объединяем через join().
Шаг 2. Запуск задачи через ForkJoinPool
import java.util.concurrent.ForkJoinPool;
public class ForkJoinSumDemo {
public static void main(String[] args) {
int[] numbers = new int[10_000_000];
for (int i = 0; i < numbers.length; i++) {
numbers[i] = 1; // Для простоты — сумма должна быть равна длине массива
}
ForkJoinPool pool = new ForkJoinPool(); // По умолчанию — по числу ядер
ArraySumTask task = new ArraySumTask(numbers, 0, numbers.length);
long result = pool.invoke(task); // Запуск задачи
System.out.println("Сумма элементов массива: " + result);
}
}
Как это работает?
- ForkJoinPool сам решает, сколько потоков использовать (обычно — по числу ядер).
- Задача автоматически делится на подзадачи, каждая из которых может исполняться на отдельном ядре.
- Производительность обычно выше, чем у последовательного кода (особенно на больших данных и многоядерных системах).
4. Как работает ForkJoinPool: немного «под капотом»
Work-Stealing (воровство работы)
ForkJoinPool реализует «воровство работы»: если у какого-то потока закончились задачи, он «крадёт» работу у другого потока. Это обеспечивает эффективную балансировку нагрузки и загрузку всех ядер.
Базовый алгоритм
- Главная задача делится на подзадачи.
- Подзадачи помещаются в специализированные очереди.
- Потоки берут задачи из своих очередей, а при их опустошении — «ищут» работу у соседей.
- Когда всё выполнено, результаты объединяются.
Схема работы
5. RecursiveAction — если не нужно возвращать результат
Если нужно просто что-то выполнить параллельно и не возвращать результат, используйте RecursiveAction. Типичные примеры — распараллелить заполнение массива, печать, сортировку «на месте» и т.п.
import java.util.concurrent.RecursiveAction;
public class PrintTask extends RecursiveAction {
private static final int THRESHOLD = 100;
private final int[] array;
private final int start, end;
public PrintTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
for (int i = start; i < end; i++) {
System.out.print(array[i] + " ");
}
} else {
int mid = (start + end) / 2;
invokeAll(
new PrintTask(array, start, mid),
new PrintTask(array, mid, end)
);
}
}
}
6. Практика: параллельный поиск максимального значения в массиве
import java.util.concurrent.RecursiveTask;
public class MaxFindTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 1000;
private final int[] array;
private final int start, end;
public MaxFindTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
int max = array[start];
for (int i = start + 1; i < end; i++) {
if (array[i] > max) max = array[i];
}
return max;
} else {
int mid = (start + end) / 2;
MaxFindTask left = new MaxFindTask(array, start, mid);
MaxFindTask right = new MaxFindTask(array, mid, end);
left.fork();
int rightResult = right.compute();
int leftResult = left.join();
return Math.max(leftResult, rightResult);
}
}
}
Запуск:
import java.util.concurrent.ForkJoinPool;
public class ForkJoinMaxDemo {
public static void main(String[] args) {
int[] array = new int[5_000_000];
for (int i = 0; i < array.length; i++) {
array[i] = (int)(Math.random() * 1_000_000);
}
ForkJoinPool pool = new ForkJoinPool();
MaxFindTask task = new MaxFindTask(array, 0, array.length);
int max = pool.invoke(task);
System.out.println("Максимальное значение: " + max);
}
}
7. Преимущества и ограничения ForkJoinPool
Преимущества
- Автоматическая балансировка нагрузки. Work-stealing позволяет эффективно использовать все ядра.
- Удобство. Не нужно вручную создавать и управлять потоками.
- Высокая производительность. Особенно на больших задачах и многоядерных системах.
- Гибкость. Можно дробить задачи на столько частей, сколько нужно.
Ограничения
- Сильная связность между подзадачами. Если подзадачи часто ждут друг друга, выгода снижается.
- Слишком мелкие задачи. Накладные расходы на деление/синхронизацию могут «съесть» преимущество.
- Побочные эффекты. Нельзя изменять общие переменные без синхронизации — получите race condition.
- Применимость. Хорош для задач, дробимых на независимые части.
8. Типичные ошибки при работе с ForkJoinPool и RecursiveTask
Ошибка №1: Слишком мелкое деление задачи. Если порог (THRESHOLD) слишком маленький, получится много крошечных задач — затраты на их создание и синхронизацию превысят выигрыш от параллелизма. Экспериментируйте с порогом: оптимальные значения часто в тысячах или десятках тысяч элементов.
Ошибка №2: Использование общих изменяемых переменных. Если подзадачи пишут в общую переменную без синхронизации — получите гонки данных (race condition). Возвращайте результат через compute() и объединяйте только в join().
Ошибка №3: Неправильное использование fork/join. Забыли вызвать fork() или join() — и подзадача не запустится параллельно или результат «потеряется». Внимательно следите за порядком вызовов.
Ошибка №4: Запуск ForkJoinTask вне ForkJoinPool. Если просто вызвать compute() у задачи, она выполнится в текущем потоке, без параллелизма. Для настоящей магии используйте pool.invoke() или pool.submit().
Ошибка №5: Игнорирование исключений. Если в задаче произошло исключение, оно проявится при вызове join() или invoke(). Не забывайте обрабатывать ошибки.
Ошибка №6: Использование ForkJoinPool для задач с блокировками. ForkJoinPool плохо подходит для задач, которые часто блокируются (ожидают I/O и т.д.). В таких случаях лучше использовать ExecutorService.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ