Вступ
Отже, ми знаємо, що в Java є потоки, про що можна прочитати в огляді "Thread'ом Java не зіпсуєш : Частина I - потоки". Давайте ще раз подивимося на типовий код:
public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Задача виконана");
};
Thread thread = new Thread(task);
thread.start();
}
Як ми бачимо, код для запуску задачі досить типовий, але для кожного нового запуску нам його доведеться повторювати. Одне із рішень — винести його в окремий метод, наприклад, execute(Runnable runnable)
. Але розробники Java вже подбали про нас і придумали інтерфейс Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Задача виконана");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Як видно, код став лаконічнішим і дозволив нам просто написати код для запуску Runnable
у потоці. Круто, чи не так? Але це лише початок:

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
є інтерфейс-нащадок ExecutorService
. У JavaDoc цього інтерфейсу сказано, що ExecutorService
є описом особливого Executor
, який надає методи зупинки роботи Executor
і дозволяє отримати java.util.concurrent.Future
, щоб відстежувати процес виконання. Раніше, в "Thread'ом Java не зіпсуєш : Частина IV — Callable, Future і друзі" ми вже коротко розглянули можливості Future
. Хто забув або не читав - раджу освіжити в пам'яті ;)
Що ще цікавого в JavaDoc написано? Що у нас є спеціальна фабрика java.util.concurrent.Executors
, яка дозволяє створювати доступні за замовчуванням реалізації ExecutorService
.
ExecutorService
Ще раз згадаємо. У нас єExecutor
для execute (тобто виконання) певної задачі у потоці, коли реалізація створення потоку захована від нас. У нас є ExecutorService
— особливий Executor
, який має набір можливостей з управління ходом виконання. І у нас є фабрика Executors
, яка дозволяє створювати ExecutorService
. Давайте тепер це зробимо самі:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Як ми бачимо, ми вказали фіксований пул потоків (Fixed Thread Pool
) розміром 2. Після чого ми по черзі надсилаємо в пул задачі. Кожна задача повертає рядок (String
), який містить ім'я потоку (currentThread().getName()
). Важливо наприкінці виконати shutdown для ExecutorService
, адже інакше наша програма не завершиться.
У фабриці Executors
є й інші фабричні методи. Наприклад, ми можемо створити пул лише з одного потоку — newSingleThreadExecutor
або пул із кешуванням newCachedThreadPool
, коли потоки будуть видалятися з пулу, якщо вони простоюють 1 хвилину.
Насправді, за цими ExecutorService
ховається блокуюча черга, в яку поміщаються задачі і з якої ці задачі виконуються. Докладніше про блокуючі черги можна подивитися у відео "Блокуюча черга - Collections #5 - Advanced Java". А також можна прочитати огляд "Блокуючі черги пакету concurrent" і відповідь на запитання "When to prefer LinkedBlockingQueue over ArrayBlockingQueue?". Супер спрощено — BlockingQueue
(блокуюча черга) блокує потік у двох випадках:
- потік намагається отримати елементи з порожньої черги
- потік намагається покласти елементи у повну чергу
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
або
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Як ми бачимо, всередині фабричних методів створюються реалізації ExecutorService
. І в основному це ThreadPoolExecutor
. Змінюються тільки атрибути, які і впливають на роботу.

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Як ми раніше побачили, всередині фабричних методів в основному створюєтьсяThreadPoolExecutor
. На функціональність впливає те, які значення передані як максимуму та мінімуму потоків, а також яка черга використовується. А використовуватися може будь-яка реалізація інтерфейсу java.util.concurrent.BlockingQueue
.
Говорячи про ThreadPoolExecutor
, варто зазначити цікаві особливості при роботі. Наприклад, не можна надсилати задачі в ThreadPoolExecutor
, якщо там немає місця:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Цей код впаде з помилкою виду:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Тобто task
не можна засабмитити, адже SynchronousQueue
влаштована так, що фактично складається з одного елемента і не дозволяє покласти туди більше. Як ми бачимо, queued tasks
тут 0, і в цьому немає нічого дивного, адже це специфіка SynchronousQueue
— фактично це черга в 1 елемент, яка завжди порожня. (!) Коли один потік кладе в чергу елемент, він чекатиме, поки інший потік не забере елемент з черги.
Тому, ми можемо змінити на new LinkedBlockingQueue<>(1)
і в помилці зміниться те, що буде вказано queued tasks = 1
. Адже черга лише на 1 елемент, то другий ми вже не можемо покласти. І впадемо на цьому.
Продовжуючи тему черги, варто зазначити, що клас ThreadPoolExecutor
має додаткові методи з обслуговування черги. Наприклад, метод threadPoolExecutor.purge()
видалить з черги всі скасовані завдання, щоб звільнити місце в черзі. Ще однією цікавою функцією, пов'язаною з чергою, є обробник неприйнятих завдань:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Для прикладу обробник просто виводить слово Rejected
на кожну відмову приймати завдання в чергу. Зручно, чи не так?
Крім того, ThreadPoolExecutor
має цікавого нащадка — ScheduledThreadPoolExecutor
, який є ScheduledExecutorService
. Він надає можливість виконувати завдання за таймером.
ScheduledExecutorService
ExecutorService
типу ScheduledExecutorService
дозволяють запускати завдання за розкладом (schedule).
Подивимось на приклад:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Тут все просто. Відправляються завдання, отримуємо "заплановане завдання" java.util.concurrent.ScheduledFuture
.
З розкладом може бути корисний також і наступний випадок:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Тут ми відправляємо Runnable
завдання на виконання з фіксованою частотою (Fixed Rate) з певною затримкою. У даному випадку, через 1 секунду кожні 2 секунди почати виконувати завдання.
Є подібний варіант:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Але тут завдання виконуються із заданим проміжком МІЖ виконанням різних завдань. Тобто завдання task
буде виконане через 1 секунду. Потім, як тільки воно буде завершене, пройде 2 секунди, і тоді нове завдання task буде запущене.
Про цю тему можна почитати наступні матеріали:
- An introduction to thread pools
- Introduction to Thread Pools
- Java Multithreading Steeplechase: Cancelling Tasks In Executors
- Picking correct Java executors for background tasks

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Поміж вказаних вище пулів потоків, є ще один. Можна сказати, що він трохи особливий. Ім'я йому — Work Stealing Pool. Якщо коротко, то Work Stealing — це такий алгоритм роботи, за якого потоки, що простоюють, починають забирати завдання інших потоків або завдання зі спільної черги. Подивимось на приклад:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Якщо ми запустимо цей код, то ExecutorService
нам створить 5 потоків, тому що кожен потік буде вставати в wait чергу по локу об'єкта lock
. Про монітори та локи по ньому ми вже раніше розбирали у "Thread'ом Java не зіпсуєш: Частина II — синхронізація".
А тепер ми замінимо Executors.newCachedThreadPool
на Executors.newWorkStealingPool()
. Що зміниться?
Ми побачимо, що наші завдання виконуються не в 5 потоків, а менше. Пам'ятайте, що cachedThreadPool
створював для кожного завдання свій потік? Тому що wait
блокував потік, а наступні завдання хотіли виконуватись і в пулі для них створювалися нові потоки. У випадку з StealingPool
потоки не будуть вічно простоювати в wait
, вони почнуть виконувати сусідні завдання.
Чим так відрізняється від решти thread pool цей WorkStealingPool
? Тим, що всередині нього живе насправді чарівний ForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Насправді є ще одна відмінність. Потоки, які створюються для ForkJoinPool
за замовчуванням є демон потоками, на відміну від потоків, створених через звичайний ThreadPool
. Загалом, варто пам'ятати про демон-потоки, тому що, наприклад, при CompletableFuture
теж використовуються демон-потоки, якщо не вказати свою ThreadFactory
, яка буде створювати не демон-потоки. Ось такі сюрпризи можуть чекати в несподіваному місці!)
Fork/Join Pool
У цій частині ми поговоримо про той самийForkJoinPool
(його ще називають fork/join framework), який живе "під капотом" у WorkStealingPool
. Загалом, Fork Join Framework з'явився ще в Java 1.7. І хоча вже Java 11 на дворі, але згадати все одно варто. Не найпоширеніше завдання, але досить цікаве.
На просторах мережі є гарний огляд на цю тему: "Fork/Join Framework в Java 7".
Fork/JoinPool
оперує в своїй роботі таким поняттям, як java.util.concurrent.RecursiveTask
. Також є аналог — java.util.concurrent.RecursiveAction
. RecursiveAction не повертають результат. Таким чином RecursiveTask
схожий на Callable
, а RecursiveAction
схожий на Runnable
.
Ну і дивлячись на назву ми бачимо два ключові методи — fork
і join
. Метод fork
запускає асинхронно в окремому потоці певне завдання. А метод join
дозволяє дочекатися завершення виконання роботи.
Існує кілька способів використання:

Підведення підсумків
Отже, ось ми і закінчили чергову частину огляду. Ми розібрались, що спочатку придумалиExecutor
для виконання потоків. Потім вирішили продовжити ідею і придумали ExecutorService
. ExecutorService
дозволяє відправляти завдання на виконання за допомогою submit
і invoke
, а також керувати сервісом, вимикаючи його. Т.к. ExecutorService
'у потрібні реалізації, написали клас з фабричними методами і назвали його Executors
. Він дозволяє створювати пули потоків ThreadPoolExecutor
'и. При цьому існують такі пули потоків, які дозволяють ще і вказати розклад для виконання, а за WorkStealingPool
ховається ForkJoinPool
.
Сподіваюся, тобі було не тільки цікаво вище написане, але і зрозуміло ) Завжди радий пропозиціям і зауваженням.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ