1. ForkJoinPool: що це таке і навіщо він потрібен
ForkJoinPool — це спеціальний пул потоків, що реалізує підхід «розділяй і володарюй» (divide and conquer). Його завдання — максимально ефективно розпаралелювати роботу, коли велику задачу можна розбити на низку незалежних підзавдань, виконати їх паралельно, а потім об’єднати результати.
- Fork (поділити) — завдання ділиться на підзавдання.
- Join (об’єднати) — результати підзавдань збираються в підсумок.
ForkJoinPool — це серце паралельних стрімів у Java: коли ви пишете list.parallelStream(), всередині використовується саме він. Але ви можете застосовувати його і напряму, отримуючи більше контролю.
Коли ForkJoinPool особливо корисний
ForkJoinPool розкривається на завданнях, які легко ділити на незалежні частини: наприклад, обробка дуже великих масивів, коли кожен фрагмент обробляється окремо, а потім результати об’єднуються.
- Завдання легко розбити на незалежні підзавдання: сортування, пошук, підсумовування.
- Підзавдання приблизно однакові за обсягом і не залежать одне від одного.
- Потрібно задіяти всі ядра процесора для максимальної швидкості.
+---------------------+
| Велике завдання |
+---------------------+
|
v
+---------+---------+
| Підзавдання 1 |
| Підзавдання 2 |
| ... |
+-------------------+
|
v
+---------+---------+
| Результати |
+-------------------+
Саме так працює «розділяй і володарюй»: розбили — порахували паралельно — об’єднали.
2. RecursiveTask і RecursiveAction: дві сторони однієї медалі
У ForkJoinPool завдання оформлюються через спеціальні класи, здатні ділитися на підзавдання та об’єднувати результати. RecursiveTask<T> повертає результат, а RecursiveAction — ні. На практиці частіше використовують RecursiveTask, щоб, наприклад, повернути суму, максимум або кількість.
Щоб створити таке завдання, наслідуємося та реалізуємо метод compute(). У ньому описуємо логіку: якщо завдання маленьке — розв’язуємо відразу; якщо велике — ділимо на підзавдання, запускаємо їх паралельно через fork() і об’єднуємо результати за допомогою join(). Так народжується природний рекурсивний паралелізм.
3. Синтаксис і приклад: паралельне обчислення суми масиву
Припустімо, у нас є великий масив чисел, і ми хочемо швидко підрахувати суму всіх елементів.
Крок 1. Клас завдання
import java.util.concurrent.RecursiveTask;
public class ArraySumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 1_000; // Поріг для поділу завдання
private final int[] array;
private final int start, end;
public ArraySumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// Якщо завдання маленьке — рахуємо напряму
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// Ділимо завдання на дві підзадачі
int mid = (start + end) / 2;
ArraySumTask leftTask = new ArraySumTask(array, start, mid);
ArraySumTask rightTask = new ArraySumTask(array, mid, end);
// Запускаємо підзадачі паралельно
leftTask.fork(); // Асинхронно
long rightResult = rightTask.compute(); // Синхронно
long leftResult = leftTask.join(); // Чекаємо завершення лівої
// Об’єднуємо результат
return leftResult + rightResult;
}
}
}
- Якщо завдання маленьке (менше за поріг THRESHOLD) — рахуємо суму звичайним циклом.
- Якщо велике — ділимо на дві, одну запускаємо асинхронно через fork(), другу рахуємо синхронно через compute(), потім об’єднуємо через join().
Крок 2. Запуск завдання через ForkJoinPool
import java.util.concurrent.ForkJoinPool;
public class ForkJoinSumDemo {
public static void main(String[] args) {
int[] numbers = new int[10_000_000];
for (int i = 0; i < numbers.length; i++) {
numbers[i] = 1; // Для простоти — сума має дорівнювати довжині масиву
}
ForkJoinPool pool = new ForkJoinPool(); // Типово — за кількістю ядер
ArraySumTask task = new ArraySumTask(numbers, 0, numbers.length);
long result = pool.invoke(task); // Запуск завдання
System.out.println("Сума елементів масиву: " + result);
}
}
Як це працює?
- ForkJoinPool сам вирішує, скільки потоків використовувати (зазвичай — за кількістю ядер).
- Завдання автоматично ділиться на підзавдання, кожне з яких може виконуватися на окремому ядрі.
- Продуктивність зазвичай вища, ніж у послідовного коду (особливо на великих даних і багатоядерних системах).
4. Як працює ForkJoinPool: трохи «під капотом»
Work-Stealing (крадіжка роботи)
ForkJoinPool реалізує «крадіжку роботи»: якщо в якогось потоку закінчилися завдання, він «краде» роботу в іншого потоку. Це забезпечує ефективне балансування навантаження і завантаження всіх ядер.
Базовий алгоритм
- Головне завдання ділиться на підзавдання.
- Підзавдання поміщаються у спеціалізовані черги.
- Потоки беруть завдання зі своїх черг, а за їх спустошення — «шукають» роботу в сусідів.
- Коли все виконано, результати об’єднуються.
Схема роботи
flowchart TD
A[Головне завдання] --> B1[Підзавдання 1]
A --> B2[Підзавдання 2]
B1 --> C1[Мале завдання 1]
B1 --> C2[Мале завдання 2]
B2 --> C3[Мале завдання 3]
B2 --> C4[Мале завдання 4]
C1 --> D[Об’єднання результатів]
C2 --> D
C3 --> D
C4 --> D
5. RecursiveAction — якщо не потрібно повертати результат
Якщо потрібно просто щось виконати паралельно і не повертати результат, використовуйте RecursiveAction. Типові приклади — розпаралелити заповнення масиву, друк, сортування «на місці» тощо.
import java.util.concurrent.RecursiveAction;
public class PrintTask extends RecursiveAction {
private static final int THRESHOLD = 100;
private final int[] array;
private final int start, end;
public PrintTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
for (int i = start; i < end; i++) {
System.out.print(array[i] + " ");
}
} else {
int mid = (start + end) / 2;
invokeAll(
new PrintTask(array, start, mid),
new PrintTask(array, mid, end)
);
}
}
}
6. Практика: паралельний пошук максимального значення в масиві
import java.util.concurrent.RecursiveTask;
public class MaxFindTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 1000;
private final int[] array;
private final int start, end;
public MaxFindTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
int max = array[start];
for (int i = start + 1; i < end; i++) {
if (array[i] > max) max = array[i];
}
return max;
} else {
int mid = (start + end) / 2;
MaxFindTask left = new MaxFindTask(array, start, mid);
MaxFindTask right = new MaxFindTask(array, mid, end);
left.fork();
int rightResult = right.compute();
int leftResult = left.join();
return Math.max(leftResult, rightResult);
}
}
}
Запуск:
import java.util.concurrent.ForkJoinPool;
public class ForkJoinMaxDemo {
public static void main(String[] args) {
int[] array = new int[5_000_000];
for (int i = 0; i < array.length; i++) {
array[i] = (int)(Math.random() * 1_000_000);
}
ForkJoinPool pool = new ForkJoinPool();
MaxFindTask task = new MaxFindTask(array, 0, array.length);
int max = pool.invoke(task);
System.out.println("Максимальне значення: " + max);
}
}
7. Переваги та обмеження ForkJoinPool
Переваги
- Автоматичне балансування навантаження. Work-stealing дає змогу ефективно використовувати всі ядра.
- Зручність. Не потрібно вручну створювати й керувати потоками.
- Висока продуктивність. Особливо на великих завданнях і багатоядерних системах.
- Гнучкість. Можна ділити завдання на стільки частин, скільки потрібно.
Обмеження
- Сильна зв’язаність між підзавданнями. Якщо підзавдання часто чекають одне одного, вигода знижується.
- Надто дрібні завдання. Накладні витрати на поділ/синхронізацію можуть «з’їсти» перевагу.
- Побічні ефекти. Не можна змінювати спільні змінні без синхронізації — отримаєте гонку даних (race condition).
- Застосовність. Добре підходить для завдань, що діляться на незалежні частини.
8. Типові помилки під час роботи з ForkJoinPool і RecursiveTask
Помилка №1: Надто дрібний поділ завдання. Якщо поріг (THRESHOLD) надто малий, вийде багато крихітних завдань — витрати на їх створення й синхронізацію перевищать виграш від паралелізму. Експериментуйте з порогом: оптимальні значення часто у тисячах або десятках тисяч елементів.
Помилка №2: Використання спільних змінюваних змінних. Якщо підзавдання пишуть у спільну змінну без синхронізації — отримаєте гонку даних (race condition). Повертайте результат через compute() і об’єднуйте лише в join().
Помилка №3: Неправильне використання fork/join. Забули викликати fork() або join() — і підзавдання не запуститься паралельно або результат «загубиться». Уважно стежте за порядком викликів.
Помилка №4: Запуск ForkJoinTask поза ForkJoinPool. Якщо просто викликати compute() у завдання, воно виконається в поточному потоці, без паралелізму. Для справжньої магії використовуйте pool.invoke() або pool.submit().
Помилка №5: Ігнорування винятків. Якщо в завданні стався виняток, він виявиться під час виклику join() або invoke(). Не забувайте обробляти помилки.
Помилка №6: Використання ForkJoinPool для задач із блокуваннями. ForkJoinPool погано підходить для завдань, які часто блокуються (очікують I/O тощо). У таких випадках краще використовувати ExecutorService.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ