Разберемся со следующим методом, который подготавливает нам ExecutorServicenewWorkStealingPool.

Данный пул потоков особенный — концепция его работы стоит на “краже” работы.

Задачи собираются в очередь и распределяются по процессорам. Но если процессор занят, то другой свободный процессор может украсть у него задачу и ее выполнить. Такой формат был введен в Java для того, чтобы сократить споры в многопоточных приложениях. В основе лежит фреймворк fork/join.

fork/join

В фреймворке fork/join задачи рекурсивно декомпозируются, то есть разбираются на подзадачи. Далее задачи выполняются индивидуально, а результаты подзадач объединяются для формирования результатов главной задачи.

Метод fork запускает асинхронно в некотором потоке задачу, а метод join позволяет дождаться завершения работы над этой задачей.

newWorkStealingPool

Метод newWorkStealingPool имеет две реализации:


public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
 
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Мы сразу же замечаем, что под капотом мы не вызываем конструктор у ThreadPoolExecutor, здесь мы работаем с сущностью ForkJoinPool. Это, как и ThreadPoolExecutor, — реализация AbstractExecutorService.

На выбор нам предоставляют 2 метода. Отличаются они тем, что в первом случае мы сами указываем какой уровень параллелизма мы хотим видеть. Если мы не указываем это значение, то в нашем пуле мы увидим уровень параллелизма равный количеству ядер процессора доступных виртуальной машине Java в данный момент времени.

Осталось разобраться на деле как это работает:


Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);
 
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Обработан запрос пользователя №" + taskNumber + " на потоке " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

Мы создаем 10 задач, где выводим статус их выполнения. После чего все задачи мы запускаем в работу с помощью метода invokeAll.

Результаты при выполнении 10 задач на 10 потоках в пуле:

Обработан запрос пользователя №9 на потоке ForkJoinPool-1-worker-10
Обработан запрос пользователя №4 на потоке ForkJoinPool-1-worker-5
Обработан запрос пользователя №7 на потоке ForkJoinPool-1-worker-8
Обработан запрос пользователя №1 на потоке ForkJoinPool-1-worker-2
Обработан запрос пользователя №2 на потоке ForkJoinPool-1-worker-3
Обработан запрос пользователя №3 на потоке ForkJoinPool-1-worker-4
Обработан запрос пользователя №6 на потоке ForkJoinPool-1-worker-7
Обработан запрос пользователя №0 на потоке ForkJoinPool-1-worker-1
Обработан запрос пользователя №5 на потоке ForkJoinPool-1-worker-6
Обработан запрос пользователя №8 на потоке ForkJoinPool-1-worker-9

Мы видим, что после формирования очереди потоки брали задачи на выполнение. Также можно проверить, как поведет себя распределение между потоками из пула для 20 задач на 10 потоках.

Обработан запрос пользователя №3 на потоке ForkJoinPool-1-worker-4
Обработан запрос пользователя №7 на потоке ForkJoinPool-1-worker-8
Обработан запрос пользователя №2 на потоке ForkJoinPool-1-worker-3
Обработан запрос пользователя №4 на потоке ForkJoinPool-1-worker-5
Обработан запрос пользователя №1 на потоке ForkJoinPool-1-worker-2
Обработан запрос пользователя №5 на потоке ForkJoinPool-1-worker-6
Обработан запрос пользователя №8 на потоке ForkJoinPool-1-worker-9
Обработан запрос пользователя №9 на потоке ForkJoinPool-1-worker-10
Обработан запрос пользователя №0 на потоке ForkJoinPool-1-worker-1
Обработан запрос пользователя №6 на потоке ForkJoinPool-1-worker-7
Обработан запрос пользователя №10 на потоке ForkJoinPool-1-worker-9
Обработан запрос пользователя №12 на потоке ForkJoinPool-1-worker-1
Обработан запрос пользователя №13 на потоке ForkJoinPool-1-worker-8
Обработан запрос пользователя №11 на потоке ForkJoinPool-1-worker-6
Обработан запрос пользователя №15 на потоке ForkJoinPool-1-worker-8
Обработан запрос пользователя №14 на потоке ForkJoinPool-1-worker-1
Обработан запрос пользователя №17 на потоке ForkJoinPool-1-worker-6
Обработан запрос пользователя №16 на потоке ForkJoinPool-1-worker-7
Обработан запрос пользователя №19 на потоке ForkJoinPool-1-worker-6
Обработан запрос пользователя №18 на потоке ForkJoinPool-1-worker-1

Из вывода видно, что некоторые потоки успевают выполнить несколько задач (ForkJoinPool-1-worker-6 выполнил 4 задачи), а некоторые — только одну (ForkJoinPool-1-worker-2). Если в реализацию метода call добавить задержку в 1 секунду, картина изменится.


Callable<Void> callable = () -> {
   System.out.println("Обработан запрос пользователя №" + taskNumber + " на потоке " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

Ради эксперимента, выполним этот же код на другой машине. Полученный вывод:

Обработан запрос пользователя №2 на потоке ForkJoinPool-1-worker-23
Обработан запрос пользователя №7 на потоке ForkJoinPool-1-worker-31
Обработан запрос пользователя №4 на потоке ForkJoinPool-1-worker-27
Обработан запрос пользователя №5 на потоке ForkJoinPool-1-worker-13
Обработан запрос пользователя №0 на потоке ForkJoinPool-1-worker-19
Обработан запрос пользователя №8 на потоке ForkJoinPool-1-worker-3
Обработан запрос пользователя №9 на потоке ForkJoinPool-1-worker-21
Обработан запрос пользователя №6 на потоке ForkJoinPool-1-worker-17
Обработан запрос пользователя №3 на потоке ForkJoinPool-1-worker-9
Обработан запрос пользователя №1 на потоке ForkJoinPool-1-worker-5
Обработан запрос пользователя №12 на потоке ForkJoinPool-1-worker-23
Обработан запрос пользователя №15 на потоке ForkJoinPool-1-worker-19
Обработан запрос пользователя №14 на потоке ForkJoinPool-1-worker-27
Обработан запрос пользователя №11 на потоке ForkJoinPool-1-worker-3
Обработан запрос пользователя №13 на потоке ForkJoinPool-1-worker-13
Обработан запрос пользователя №10 на потоке ForkJoinPool-1-worker-31
Обработан запрос пользователя №18 на потоке ForkJoinPool-1-worker-5
Обработан запрос пользователя №16 на потоке ForkJoinPool-1-worker-9
Обработан запрос пользователя №17 на потоке ForkJoinPool-1-worker-21
Обработан запрос пользователя №19 на потоке ForkJoinPool-1-worker-17

Из интересного в этот выводе можно отметить, что мы “заказывали” потоки в пуле. А имена воркеров идут не с единицы до десяти включительно, а больше. Если смотреть по уникальным названиям, то воркеров действительно десять (3, 5, 9, 13, 17, 19, 21, 23, 27 и 31). Возникает резонный вопрос: почему так произошло? В любой непонятной ситуации используй debug.

Этим и займемся. Приведем объект executorService к типу ForkJoinPool:


final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

Смотреть будем именно на этот объект после вызова метода invokeAll в режиме Evaluate Expression. Для этого после метода invokeAll добавим любую команду, например, пустой sout, и на нем поставим брейкпоинт.

Видим, что в пуле 10 потоков, но массив потоков (воркеров) имеет размерность 32. Странно, но ладно, будем копать дальше. Попробуем при создании пула параметр параллелизма поставить больше, чем 32, например, 40.


ExecutorService executorService = Executors.newWorkStealingPool(40);

И в дебаге еще раз посмотрим на объект forkJoinPool.

Теперь размер массива воркеров 128. Можно предположить, что это внутренняя оптимизация JVM. Давай попробуем ее найти в коде JDK (openjdk-14):

Да, действительно: размер массива воркеров рассчитывается исходя из значения параллелизма, и побитовых манипуляций с ним. Не нужно пытаться разобраться что именно здесь происходит. Просто достаточно знать сам факт наличия такой оптимизации.

Еще одна особенность нашего примера — использование метода invokeAll. Стоит отметить, что метод invokeAll нам может вернуть результат, вернее лист результатов (в нашем случае — List<Future<Void>>), где мы можем получить результат выполнения каждой из задач.


var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Обработать результат выполнения задачи
        }

Такой особенный вид сервиса и пула потоков можно использовать в задачах, где прогнозируется уровень параллелизма, либо не прогнозируется, но подразумевается.