Розберемося з наступним методом, який підготовляє нам ExecutorServicenewWorkStealingPool.

Цей пул потоків особливий — концепція його роботи заснована на “крадіжці” роботи.

Завдання збираються до черги й розподіляються за процесорами. Але якщо процесор зайнятий, інший вільний процесор може вкрасти в нього завдання та виконати його. Такий формат ввели в Java для того, щоб скоротити суперечки у багатопотокових застосунках. В основі лежить фреймворк fork/join.

fork/join

У фреймворку fork/join завдання рекурсивно декомпозуються, тобто розбираються на підзавдання. Далі завдання виконуються індивідуально, а результати підзадач об'єднуються для формування результатів головного завдання.

Метод fork асинхронно запускає в певному потоці завдання, а метод join дозволяє дочекатися на завершення роботи над цим завданням.

newWorkStealingPool

Метод newWorkStealingPool має дві реалізації:


public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
 
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Ми одразу ж помічаємо, що під капотом ми не викликаємо конструктор у ThreadPoolExecutor. Тут ми працюємо із сутністю ForkJoinPool. Це, як і ThreadPoolExecutor, — реалізація AbstractExecutorService.

На вибір нам дають два методи. Відрізняються вони тим, що у першому випадку ми самі вказуємо, який рівень паралелізму хочемо бачити. Якщо ми не встановлюємо це значення, то в нашому пулі ми побачимо рівень паралелізму, що дорівнює кількості ядер процесора, доступних віртуальній машині Java в даний момент часу.

Залишилося розібратися, як це працює в дійсності:


Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);
 
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Опрацьовано запит користувача №" + taskNumber + " на потоці " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

Ми створюємо 10 завдань, де виводимо статус виконання. Після цього ми запускаємо в роботу всі завдання за допомогою методу invokeAll.

Результати при виконанні 10 завдань на 10 потоках у пулі:

Опрацьовано запит користувача №9 на потоці ForkJoinPool-1-worker-10
Опрацьовано запит користувача №4 на потоці ForkJoinPool-1-worker-5
Опрацьовано запит користувача №7 на потоці ForkJoinPool-1-worker-8
Опрацьовано запит користувача №1 на потоці ForkJoinPool-1-worker-2
Опрацьовано запит користувача №2 на потоці ForkJoinPool-1-worker-3
Опрацьовано запит користувача №3 на потоці ForkJoinPool-1-worker-4
Опрацьовано запит користувача №6 на потоці ForkJoinPool-1-worker-7
Опрацьовано запит користувача №0 на потоці ForkJoinPool-1-worker-1
Опрацьовано запит користувача №5 на потоці ForkJoinPool-1-worker-6
Опрацьовано запит користувача №8 на потоці ForkJoinPool-1-worker-9

Ми бачимо, що після формування черги потоки брали завдання на виконання. Також можна перевірити, як поведе себе розподіл між потоками з пулу для 20 завдань на 10 потоках.

Опрацьовано запит користувача №3 на потоці ForkJoinPool-1-worker-4
Опрацьовано запит користувача №7 на потоці ForkJoinPool-1-worker-8
Опрацьовано запит користувача №2 на потоці ForkJoinPool-1-worker-3
Опрацьовано запит користувача №4 на потоці ForkJoinPool-1-worker-5
Опрацьовано запит користувача №1 на потоці ForkJoinPool-1-worker-2
Опрацьовано запит користувача №5 на потоці ForkJoinPool-1-worker-6
Опрацьовано запит користувача №8 на потоці ForkJoinPool-1-worker-9
Опрацьовано запит користувача №9 на потоці ForkJoinPool-1-worker-10
Опрацьовано запит користувача №0 на потоці ForkJoinPool-1-worker-1
Опрацьовано запит користувача №6 на потоці ForkJoinPool-1-worker-7
Опрацьовано запит користувача №10 на потоці ForkJoinPool-1-worker-9
Опрацьовано запит користувача №12 на потоці ForkJoinPool-1-worker-1
Опрацьовано запит користувача №13 на потоці ForkJoinPool-1-worker-8
Опрацьовано запит користувача №11 на потоці ForkJoinPool-1-worker-6
Опрацьовано запит користувача №15 на потоці ForkJoinPool-1-worker-8
Опрацьовано запит користувача №14 на потоці ForkJoinPool-1-worker-1
Опрацьовано запит користувача №17 на потоці ForkJoinPool-1-worker-6
Опрацьовано запит користувача №16 на потоці ForkJoinPool-1-worker-7
Опрацьовано запит користувача №19 на потоці ForkJoinPool-1-worker-6
Опрацьовано запит користувача №18 на потоці ForkJoinPool-1-worker-1

Із виведення зрозуміло, що деякі потоки встигають виконати кілька завдань (ForkJoinPool-1-worker-6 виконав 4 завдання), а деякі — тільки одне (ForkJoinPool-1-worker-2). Якщо у реалізацію методу call додати затримку в 1 секунду, картина зміниться.


Callable<Void> callable = () -> {
   System.out.println("Опрацьовано запит користувача №" + taskNumber + " на потоці " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

Заради експерименту виконаємо цей же код на іншій машині. Отримаємо таке виведення:

Опрацьовано запит користувача №2 на потоці ForkJoinPool-1-worker-23
Опрацьовано запит користувача №7 на потоці ForkJoinPool-1-worker-31
Опрацьовано запит користувача №4 на потоці ForkJoinPool-1-worker-27
Опрацьовано запит користувача №5 на потоці ForkJoinPool-1-worker-13
Опрацьовано запит користувача №0 на потоці ForkJoinPool-1-worker-19
Опрацьовано запит користувача №8 на потоці ForkJoinPool-1-worker-3
Опрацьовано запит користувача №9 на потоці ForkJoinPool-1-worker-21
Опрацьовано запит користувача №6 на потоці ForkJoinPool-1-worker-17
Опрацьовано запит користувача №3 на потоці ForkJoinPool-1-worker-9
Опрацьовано запит користувача №1 на потоці ForkJoinPool-1-worker-5
Опрацьовано запит користувача №12 на потоці ForkJoinPool-1-worker-23
Опрацьовано запит користувача №15 на потоці ForkJoinPool-1-worker-19
Опрацьовано запит користувача №14 на потоці ForkJoinPool-1-worker-27
Опрацьовано запит користувача №11 на потоці ForkJoinPool-1-worker-3
Опрацьовано запит користувача №13 на потоці ForkJoinPool-1-worker-13
Опрацьовано запит користувача №10 на потоці ForkJoinPool-1-worker-31
Опрацьовано запит користувача №18 на потоці ForkJoinPool-1-worker-5
Опрацьовано запит користувача №16 на потоці ForkJoinPool-1-worker-9
Опрацьовано запит користувача №17 на потоці ForkJoinPool-1-worker-21
Опрацьовано запит користувача №19 на потоці ForkJoinPool-1-worker-17

Із цікавого в цьому виведенні можна відзначити те, що ми “замовляли” потоки у пулі. А імена воркерів йдуть не з одиниці до десяти включно, а більше. Якщо дивитися за унікальними назвами, то воркерів насправді десять (3, 5, 9, 13, 17, 19, 21, 23, 27 і 31). Виникає резонне запитання: чому так сталося? У будь-якій незрозумілій ситуації використовуй debug.

Цим і займемося. Приведемо об'єкт executorService до типу ForkJoinPool:


final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

Дивитимемося саме на цей об'єкт після виклику методу invokeAll в режимі Evaluate Expression. Для цього після методу invokeAll додамо будь-яку команду, наприклад, порожній sout, і на ньому поставимо брейкпоінт.

Бачимо, що в пулі 10 потоків, але масив потоків (воркерів) має розмірність 32. Дивно, але добре, копатимемо далі. Спробуємо під час створення пулу параметр паралелізму поставити більше, ніж 32. Наприклад, 40.


ExecutorService executorService = Executors.newWorkStealingPool(40);

І в дебазі ще раз подивимось на об'єкт forkJoinPool.

Тепер розмір масиву воркерів – 128. Можна припустити, що це є внутрішня оптимізація JVM. Давай спробуємо знайти її в коді JDK (openjdk-14):

Так, дійсно: розмір масиву воркерів розраховується з огляду на значення паралелізму та побітових маніпуляцій із ним. Не треба намагатися розібратися, що саме тут відбувається. Просто достатньо знати про сам факт наявності такої оптимізації.

Ще одна особливість нашого прикладу — використання методу invokeAll. Варто зазначити, що метод invokeAll нам може повернути результат, точніше список результатів (у нашому випадку — List<Future<Void>>), де ми можемо отримати результат виконання кожного із завдань.


var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Опрацювати результат виконання завдання
        }

Такий особливий вид сервісу та пулу потоків можна використовувати в завданнях, де прогнозується рівень паралелізму, або не прогнозується, але мається на увазі.