JavaRush /Курси /JAVA 25 SELF /ForkJoinPool і RecursiveTask: рекурсивні завдання

ForkJoinPool і RecursiveTask: рекурсивні завдання

JAVA 25 SELF
Рівень 54 , Лекція 3
Відкрита

1. ForkJoinPool: що це таке і навіщо він потрібен

ForkJoinPool — це спеціальний пул потоків, що реалізує підхід «розділяй і володарюй» (divide and conquer). Його завдання — максимально ефективно розпаралелювати роботу, коли велику задачу можна розбити на низку незалежних підзавдань, виконати їх паралельно, а потім об’єднати результати.

  • Fork (поділити) — завдання ділиться на підзавдання.
  • Join (об’єднати) — результати підзавдань збираються в підсумок.

ForkJoinPool — це серце паралельних стрімів у Java: коли ви пишете list.parallelStream(), всередині використовується саме він. Але ви можете застосовувати його і напряму, отримуючи більше контролю.

Коли ForkJoinPool особливо корисний

ForkJoinPool розкривається на завданнях, які легко ділити на незалежні частини: наприклад, обробка дуже великих масивів, коли кожен фрагмент обробляється окремо, а потім результати об’єднуються.

  • Завдання легко розбити на незалежні підзавдання: сортування, пошук, підсумовування.
  • Підзавдання приблизно однакові за обсягом і не залежать одне від одного.
  • Потрібно задіяти всі ядра процесора для максимальної швидкості.
+---------------------+
|   Велике завдання   |
+---------------------+
          |
          v
+---------+---------+
|  Підзавдання 1    |
|  Підзавдання 2    |
|  ...              |
+-------------------+
          |
          v
+---------+---------+
|  Результати       |
+-------------------+

Саме так працює «розділяй і володарюй»: розбили — порахували паралельно — об’єднали.

2. RecursiveTask і RecursiveAction: дві сторони однієї медалі

У ForkJoinPool завдання оформлюються через спеціальні класи, здатні ділитися на підзавдання та об’єднувати результати. RecursiveTask<T> повертає результат, а RecursiveAction — ні. На практиці частіше використовують RecursiveTask, щоб, наприклад, повернути суму, максимум або кількість.

Щоб створити таке завдання, наслідуємося та реалізуємо метод compute(). У ньому описуємо логіку: якщо завдання маленьке — розв’язуємо відразу; якщо велике — ділимо на підзавдання, запускаємо їх паралельно через fork() і об’єднуємо результати за допомогою join(). Так народжується природний рекурсивний паралелізм.

3. Синтаксис і приклад: паралельне обчислення суми масиву

Припустімо, у нас є великий масив чисел, і ми хочемо швидко підрахувати суму всіх елементів.

Крок 1. Клас завдання

import java.util.concurrent.RecursiveTask;

public class ArraySumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 1_000; // Поріг для поділу завдання
    private final int[] array;
    private final int start, end;

    public ArraySumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        // Якщо завдання маленьке — рахуємо напряму
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            // Ділимо завдання на дві підзадачі
            int mid = (start + end) / 2;
            ArraySumTask leftTask = new ArraySumTask(array, start, mid);
            ArraySumTask rightTask = new ArraySumTask(array, mid, end);

            // Запускаємо підзадачі паралельно
            leftTask.fork(); // Асинхронно
            long rightResult = rightTask.compute(); // Синхронно
            long leftResult = leftTask.join(); // Чекаємо завершення лівої

            // Об’єднуємо результат
            return leftResult + rightResult;
        }
    }
}
  • Якщо завдання маленьке (менше за поріг THRESHOLD) — рахуємо суму звичайним циклом.
  • Якщо велике — ділимо на дві, одну запускаємо асинхронно через fork(), другу рахуємо синхронно через compute(), потім об’єднуємо через join().

Крок 2. Запуск завдання через ForkJoinPool

import java.util.concurrent.ForkJoinPool;

public class ForkJoinSumDemo {
    public static void main(String[] args) {
        int[] numbers = new int[10_000_000];
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = 1; // Для простоти — сума має дорівнювати довжині масиву
        }

        ForkJoinPool pool = new ForkJoinPool(); // Типово — за кількістю ядер

        ArraySumTask task = new ArraySumTask(numbers, 0, numbers.length);

        long result = pool.invoke(task); // Запуск завдання

        System.out.println("Сума елементів масиву: " + result);
    }
}

Як це працює?

  • ForkJoinPool сам вирішує, скільки потоків використовувати (зазвичай — за кількістю ядер).
  • Завдання автоматично ділиться на підзавдання, кожне з яких може виконуватися на окремому ядрі.
  • Продуктивність зазвичай вища, ніж у послідовного коду (особливо на великих даних і багатоядерних системах).

4. Як працює ForkJoinPool: трохи «під капотом»

Work-Stealing (крадіжка роботи)

ForkJoinPool реалізує «крадіжку роботи»: якщо в якогось потоку закінчилися завдання, він «краде» роботу в іншого потоку. Це забезпечує ефективне балансування навантаження і завантаження всіх ядер.

Базовий алгоритм

  • Головне завдання ділиться на підзавдання.
  • Підзавдання поміщаються у спеціалізовані черги.
  • Потоки беруть завдання зі своїх черг, а за їх спустошення — «шукають» роботу в сусідів.
  • Коли все виконано, результати об’єднуються.

Схема роботи

flowchart TD
    A[Головне завдання] --> B1[Підзавдання 1]
    A --> B2[Підзавдання 2]
    B1 --> C1[Мале завдання 1]
    B1 --> C2[Мале завдання 2]
    B2 --> C3[Мале завдання 3]
    B2 --> C4[Мале завдання 4]
    C1 --> D[Об’єднання результатів]
    C2 --> D
    C3 --> D
    C4 --> D

5. RecursiveAction — якщо не потрібно повертати результат

Якщо потрібно просто щось виконати паралельно і не повертати результат, використовуйте RecursiveAction. Типові приклади — розпаралелити заповнення масиву, друк, сортування «на місці» тощо.

import java.util.concurrent.RecursiveAction;

public class PrintTask extends RecursiveAction {
    private static final int THRESHOLD = 100;
    private final int[] array;
    private final int start, end;

    public PrintTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if (end - start <= THRESHOLD) {
            for (int i = start; i < end; i++) {
                System.out.print(array[i] + " ");
            }
        } else {
            int mid = (start + end) / 2;
            invokeAll(
                new PrintTask(array, start, mid),
                new PrintTask(array, mid, end)
            );
        }
    }
}

6. Практика: паралельний пошук максимального значення в масиві

import java.util.concurrent.RecursiveTask;

public class MaxFindTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 1000;
    private final int[] array;
    private final int start, end;

    public MaxFindTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start <= THRESHOLD) {
            int max = array[start];
            for (int i = start + 1; i < end; i++) {
                if (array[i] > max) max = array[i];
            }
            return max;
        } else {
            int mid = (start + end) / 2;
            MaxFindTask left = new MaxFindTask(array, start, mid);
            MaxFindTask right = new MaxFindTask(array, mid, end);
            left.fork();
            int rightResult = right.compute();
            int leftResult = left.join();
            return Math.max(leftResult, rightResult);
        }
    }
}

Запуск:

import java.util.concurrent.ForkJoinPool;

public class ForkJoinMaxDemo {
    public static void main(String[] args) {
        int[] array = new int[5_000_000];
        for (int i = 0; i < array.length; i++) {
            array[i] = (int)(Math.random() * 1_000_000);
        }

        ForkJoinPool pool = new ForkJoinPool();
        MaxFindTask task = new MaxFindTask(array, 0, array.length);

        int max = pool.invoke(task);

        System.out.println("Максимальне значення: " + max);
    }
}

7. Переваги та обмеження ForkJoinPool

Переваги

  • Автоматичне балансування навантаження. Work-stealing дає змогу ефективно використовувати всі ядра.
  • Зручність. Не потрібно вручну створювати й керувати потоками.
  • Висока продуктивність. Особливо на великих завданнях і багатоядерних системах.
  • Гнучкість. Можна ділити завдання на стільки частин, скільки потрібно.

Обмеження

  • Сильна зв’язаність між підзавданнями. Якщо підзавдання часто чекають одне одного, вигода знижується.
  • Надто дрібні завдання. Накладні витрати на поділ/синхронізацію можуть «з’їсти» перевагу.
  • Побічні ефекти. Не можна змінювати спільні змінні без синхронізації — отримаєте гонку даних (race condition).
  • Застосовність. Добре підходить для завдань, що діляться на незалежні частини.

8. Типові помилки під час роботи з ForkJoinPool і RecursiveTask

Помилка №1: Надто дрібний поділ завдання. Якщо поріг (THRESHOLD) надто малий, вийде багато крихітних завдань — витрати на їх створення й синхронізацію перевищать виграш від паралелізму. Експериментуйте з порогом: оптимальні значення часто у тисячах або десятках тисяч елементів.

Помилка №2: Використання спільних змінюваних змінних. Якщо підзавдання пишуть у спільну змінну без синхронізації — отримаєте гонку даних (race condition). Повертайте результат через compute() і об’єднуйте лише в join().

Помилка №3: Неправильне використання fork/join. Забули викликати fork() або join() — і підзавдання не запуститься паралельно або результат «загубиться». Уважно стежте за порядком викликів.

Помилка №4: Запуск ForkJoinTask поза ForkJoinPool. Якщо просто викликати compute() у завдання, воно виконається в поточному потоці, без паралелізму. Для справжньої магії використовуйте pool.invoke() або pool.submit().

Помилка №5: Ігнорування винятків. Якщо в завданні стався виняток, він виявиться під час виклику join() або invoke(). Не забувайте обробляти помилки.

Помилка №6: Використання ForkJoinPool для задач із блокуваннями. ForkJoinPool погано підходить для завдань, які часто блокуються (очікують I/O тощо). У таких випадках краще використовувати ExecutorService.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ