Вступ
Отже, ми знаємо, що в Java є потоки, про що можна прочитати в огляді "Thread'ом Java не зіпсуєш : Частина I - потоки". Давайте ще раз подивимося на типовий код:
public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Задача виконана");
};
Thread thread = new Thread(task);
thread.start();
}
Як ми бачимо, код для запуску задачі досить типовий, але для кожного нового запуску нам його доведеться повторювати. Одне із рішень — винести його в окремий метод, наприклад, execute(Runnable runnable). Але розробники Java вже подбали про нас і придумали інтерфейс Executor:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Задача виконана");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Як видно, код став лаконічнішим і дозволив нам просто написати код для запуску Runnable у потоці. Круто, чи не так? Але це лише початок:

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor є інтерфейс-нащадок ExecutorService. У JavaDoc цього інтерфейсу сказано, що ExecutorService є описом особливого Executor, який надає методи зупинки роботи Executor і дозволяє отримати java.util.concurrent.Future, щоб відстежувати процес виконання. Раніше, в "Thread'ом Java не зіпсуєш : Частина IV — Callable, Future і друзі" ми вже коротко розглянули можливості Future. Хто забув або не читав - раджу освіжити в пам'яті ;)
Що ще цікавого в JavaDoc написано? Що у нас є спеціальна фабрика java.util.concurrent.Executors, яка дозволяє створювати доступні за замовчуванням реалізації ExecutorService.
ExecutorService
Ще раз згадаємо. У нас єExecutor для execute (тобто виконання) певної задачі у потоці, коли реалізація створення потоку захована від нас. У нас є ExecutorService — особливий Executor, який має набір можливостей з управління ходом виконання. І у нас є фабрика Executors, яка дозволяє створювати ExecutorService. Давайте тепер це зробимо самі:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Як ми бачимо, ми вказали фіксований пул потоків (Fixed Thread Pool) розміром 2. Після чого ми по черзі надсилаємо в пул задачі. Кожна задача повертає рядок (String), який містить ім'я потоку (currentThread().getName()). Важливо наприкінці виконати shutdown для ExecutorService, адже інакше наша програма не завершиться.
У фабриці Executors є й інші фабричні методи. Наприклад, ми можемо створити пул лише з одного потоку — newSingleThreadExecutor або пул із кешуванням newCachedThreadPool, коли потоки будуть видалятися з пулу, якщо вони простоюють 1 хвилину.
Насправді, за цими ExecutorService ховається блокуюча черга, в яку поміщаються задачі і з якої ці задачі виконуються. Докладніше про блокуючі черги можна подивитися у відео "Блокуюча черга - Collections #5 - Advanced Java". А також можна прочитати огляд "Блокуючі черги пакету concurrent" і відповідь на запитання "When to prefer LinkedBlockingQueue over ArrayBlockingQueue?". Супер спрощено — BlockingQueue (блокуюча черга) блокує потік у двох випадках:
- потік намагається отримати елементи з порожньої черги
- потік намагається покласти елементи у повну чергу
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
або
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Як ми бачимо, всередині фабричних методів створюються реалізації ExecutorService. І в основному це ThreadPoolExecutor. Змінюються тільки атрибути, які і впливають на роботу.

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Як ми раніше побачили, всередині фабричних методів в основному створюєтьсяThreadPoolExecutor. На функціональність впливає те, які значення передані як максимуму та мінімуму потоків, а також яка черга використовується. А використовуватися може будь-яка реалізація інтерфейсу java.util.concurrent.BlockingQueue.
Говорячи про ThreadPoolExecutor, варто зазначити цікаві особливості при роботі. Наприклад, не можна надсилати задачі в ThreadPoolExecutor, якщо там немає місця:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Цей код впаде з помилкою виду:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Тобто task не можна засабмитити, адже SynchronousQueue влаштована так, що фактично складається з одного елемента і не дозволяє покласти туди більше. Як ми бачимо, queued tasks тут 0, і в цьому немає нічого дивного, адже це специфіка SynchronousQueue — фактично це черга в 1 елемент, яка завжди порожня. (!) Коли один потік кладе в чергу елемент, він чекатиме, поки інший потік не забере елемент з черги.
Тому, ми можемо змінити на new LinkedBlockingQueue<>(1) і в помилці зміниться те, що буде вказано queued tasks = 1. Адже черга лише на 1 елемент, то другий ми вже не можемо покласти. І впадемо на цьому.
Продовжуючи тему черги, варто зазначити, що клас ThreadPoolExecutor має додаткові методи з обслуговування черги. Наприклад, метод threadPoolExecutor.purge() видалить з черги всі скасовані завдання, щоб звільнити місце в черзі. Ще однією цікавою функцією, пов'язаною з чергою, є обробник неприйнятих завдань:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Для прикладу обробник просто виводить слово Rejected на кожну відмову приймати завдання в чергу. Зручно, чи не так?
Крім того, ThreadPoolExecutor має цікавого нащадка — ScheduledThreadPoolExecutor, який є ScheduledExecutorService. Він надає можливість виконувати завдання за таймером.
ScheduledExecutorService
ExecutorService типу ScheduledExecutorService дозволяють запускати завдання за розкладом (schedule).
Подивимось на приклад:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Тут все просто. Відправляються завдання, отримуємо "заплановане завдання" java.util.concurrent.ScheduledFuture.
З розкладом може бути корисний також і наступний випадок:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Тут ми відправляємо Runnable завдання на виконання з фіксованою частотою (Fixed Rate) з певною затримкою. У даному випадку, через 1 секунду кожні 2 секунди почати виконувати завдання.
Є подібний варіант:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Але тут завдання виконуються із заданим проміжком МІЖ виконанням різних завдань. Тобто завдання task буде виконане через 1 секунду. Потім, як тільки воно буде завершене, пройде 2 секунди, і тоді нове завдання task буде запущене.
Про цю тему можна почитати наступні матеріали:
- An introduction to thread pools
- Introduction to Thread Pools
- Java Multithreading Steeplechase: Cancelling Tasks In Executors
- Picking correct Java executors for background tasks

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Поміж вказаних вище пулів потоків, є ще один. Можна сказати, що він трохи особливий. Ім'я йому — Work Stealing Pool. Якщо коротко, то Work Stealing — це такий алгоритм роботи, за якого потоки, що простоюють, починають забирати завдання інших потоків або завдання зі спільної черги. Подивимось на приклад:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Якщо ми запустимо цей код, то ExecutorService нам створить 5 потоків, тому що кожен потік буде вставати в wait чергу по локу об'єкта lock. Про монітори та локи по ньому ми вже раніше розбирали у "Thread'ом Java не зіпсуєш: Частина II — синхронізація".
А тепер ми замінимо Executors.newCachedThreadPool на Executors.newWorkStealingPool(). Що зміниться?
Ми побачимо, що наші завдання виконуються не в 5 потоків, а менше. Пам'ятайте, що cachedThreadPool створював для кожного завдання свій потік? Тому що wait блокував потік, а наступні завдання хотіли виконуватись і в пулі для них створювалися нові потоки. У випадку з StealingPool потоки не будуть вічно простоювати в wait, вони почнуть виконувати сусідні завдання.
Чим так відрізняється від решти thread pool цей WorkStealingPool? Тим, що всередині нього живе насправді чарівний ForkJoinPool:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Насправді є ще одна відмінність. Потоки, які створюються для ForkJoinPool за замовчуванням є демон потоками, на відміну від потоків, створених через звичайний ThreadPool. Загалом, варто пам'ятати про демон-потоки, тому що, наприклад, при CompletableFuture теж використовуються демон-потоки, якщо не вказати свою ThreadFactory, яка буде створювати не демон-потоки. Ось такі сюрпризи можуть чекати в несподіваному місці!)
Fork/Join Pool
У цій частині ми поговоримо про той самийForkJoinPool (його ще називають fork/join framework), який живе "під капотом" у WorkStealingPool. Загалом, Fork Join Framework з'явився ще в Java 1.7. І хоча вже Java 11 на дворі, але згадати все одно варто. Не найпоширеніше завдання, але досить цікаве.
На просторах мережі є гарний огляд на цю тему: "Fork/Join Framework в Java 7".
Fork/JoinPool оперує в своїй роботі таким поняттям, як java.util.concurrent.RecursiveTask. Також є аналог — java.util.concurrent.RecursiveAction. RecursiveAction не повертають результат. Таким чином RecursiveTask схожий на Callable, а RecursiveAction схожий на Runnable.
Ну і дивлячись на назву ми бачимо два ключові методи — fork і join. Метод fork запускає асинхронно в окремому потоці певне завдання. А метод join дозволяє дочекатися завершення виконання роботи.
Існує кілька способів використання:
Ця картинка — це частина слайду доповіді Олексія Шипильова "Fork/Join: реалізація, використання, продуктивність".
Щоб стало зрозуміліше, варто подивитися його доповідь на JEE CONF: "Fork Join особливості реалізації".
Підведення підсумків
Отже, ось ми і закінчили чергову частину огляду. Ми розібрались, що спочатку придумалиExecutor для виконання потоків. Потім вирішили продовжити ідею і придумали ExecutorService. ExecutorService дозволяє відправляти завдання на виконання за допомогою submit і invoke, а також керувати сервісом, вимикаючи його. Т.к. ExecutorService'у потрібні реалізації, написали клас з фабричними методами і назвали його Executors. Він дозволяє створювати пули потоків ThreadPoolExecutor'и. При цьому існують такі пули потоків, які дозволяють ще і вказати розклад для виконання, а за WorkStealingPool ховається ForkJoinPool.
Сподіваюся, тобі було не тільки цікаво вище написане, але і зрозуміло ) Завжди радий пропозиціям і зауваженням.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ