Вступ

Отже, ми знаємо, що в Java є потоки, про що можна прочитати в огляді "Thread'ом Java не зіпсуєш : Частина I - потоки". Давайте ще раз подивимося на типовий код:

public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Задача виконана");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Як ми бачимо, код для запуску задачі досить типовий, але для кожного нового запуску нам його доведеться повторювати. Одне із рішень — винести його в окремий метод, наприклад, execute(Runnable runnable). Але розробники Java вже подбали про нас і придумали інтерфейс Executor:

public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Задача виконана");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Як видно, код став лаконічнішим і дозволив нам просто написати код для запуску Runnable у потоці. Круто, чи не так? Але це лише початок: Thread'ом Java не зіпсуєш: Частина V — Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Як видно, в інтерфейсу Executor є інтерфейс-нащадок ExecutorService. У JavaDoc цього інтерфейсу сказано, що ExecutorService є описом особливого Executor, який надає методи зупинки роботи Executor і дозволяє отримати java.util.concurrent.Future, щоб відстежувати процес виконання. Раніше, в "Thread'ом Java не зіпсуєш : Частина IV — Callable, Future і друзі" ми вже коротко розглянули можливості Future. Хто забув або не читав - раджу освіжити в пам'яті ;) Що ще цікавого в JavaDoc написано? Що у нас є спеціальна фабрика java.util.concurrent.Executors, яка дозволяє створювати доступні за замовчуванням реалізації ExecutorService.

ExecutorService

Ще раз згадаємо. У нас є Executor для execute (тобто виконання) певної задачі у потоці, коли реалізація створення потоку захована від нас. У нас є ExecutorService — особливий Executor, який має набір можливостей з управління ходом виконання. І у нас є фабрика Executors, яка дозволяє створювати ExecutorService. Давайте тепер це зробимо самі:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Як ми бачимо, ми вказали фіксований пул потоків (Fixed Thread Pool) розміром 2. Після чого ми по черзі надсилаємо в пул задачі. Кожна задача повертає рядок (String), який містить ім'я потоку (currentThread().getName()). Важливо наприкінці виконати shutdown для ExecutorService, адже інакше наша програма не завершиться. У фабриці Executors є й інші фабричні методи. Наприклад, ми можемо створити пул лише з одного потоку — newSingleThreadExecutor або пул із кешуванням newCachedThreadPool, коли потоки будуть видалятися з пулу, якщо вони простоюють 1 хвилину. Насправді, за цими ExecutorService ховається блокуюча черга, в яку поміщаються задачі і з якої ці задачі виконуються. Докладніше про блокуючі черги можна подивитися у відео "Блокуюча черга - Collections #5 - Advanced Java". А також можна прочитати огляд "Блокуючі черги пакету concurrent" і відповідь на запитання "When to prefer LinkedBlockingQueue over ArrayBlockingQueue?". Супер спрощено — BlockingQueue (блокуюча черга) блокує потік у двох випадках:
  • потік намагається отримати елементи з порожньої черги
  • потік намагається покласти елементи у повну чергу
Якщо подивитися на реалізацію фабричних методів, то ми побачимо, як вони влаштовані. Наприклад:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
або

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Як ми бачимо, всередині фабричних методів створюються реалізації ExecutorService. І в основному це ThreadPoolExecutor. Змінюються тільки атрибути, які і впливають на роботу. Thread'ом Java не зіпсуєш: Частина V — Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Як ми раніше побачили, всередині фабричних методів в основному створюється ThreadPoolExecutor. На функціональність впливає те, які значення передані як максимуму та мінімуму потоків, а також яка черга використовується. А використовуватися може будь-яка реалізація інтерфейсу java.util.concurrent.BlockingQueue. Говорячи про ThreadPoolExecutor, варто зазначити цікаві особливості при роботі. Наприклад, не можна надсилати задачі в ThreadPoolExecutor, якщо там немає місця:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Цей код впаде з помилкою виду:

Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Тобто task не можна засабмитити, адже SynchronousQueue влаштована так, що фактично складається з одного елемента і не дозволяє покласти туди більше. Як ми бачимо, queued tasks тут 0, і в цьому немає нічого дивного, адже це специфіка SynchronousQueue — фактично це черга в 1 елемент, яка завжди порожня. (!) Коли один потік кладе в чергу елемент, він чекатиме, поки інший потік не забере елемент з черги. Тому, ми можемо змінити на new LinkedBlockingQueue<>(1) і в помилці зміниться те, що буде вказано queued tasks = 1. Адже черга лише на 1 елемент, то другий ми вже не можемо покласти. І впадемо на цьому. Продовжуючи тему черги, варто зазначити, що клас ThreadPoolExecutor має додаткові методи з обслуговування черги. Наприклад, метод threadPoolExecutor.purge() видалить з черги всі скасовані завдання, щоб звільнити місце в черзі. Ще однією цікавою функцією, пов'язаною з чергою, є обробник неприйнятих завдань:

public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Для прикладу обробник просто виводить слово Rejected на кожну відмову приймати завдання в чергу. Зручно, чи не так? Крім того, ThreadPoolExecutor має цікавого нащадка — ScheduledThreadPoolExecutor, який є ScheduledExecutorService. Він надає можливість виконувати завдання за таймером.

ScheduledExecutorService

ExecutorService типу ScheduledExecutorService дозволяють запускати завдання за розкладом (schedule). Подивимось на приклад:

public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Тут все просто. Відправляються завдання, отримуємо "заплановане завдання" java.util.concurrent.ScheduledFuture. З розкладом може бути корисний також і наступний випадок:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Тут ми відправляємо Runnable завдання на виконання з фіксованою частотою (Fixed Rate) з певною затримкою. У даному випадку, через 1 секунду кожні 2 секунди почати виконувати завдання. Є подібний варіант:

scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Але тут завдання виконуються із заданим проміжком МІЖ виконанням різних завдань. Тобто завдання task буде виконане через 1 секунду. Потім, як тільки воно буде завершене, пройде 2 секунди, і тоді нове завдання task буде запущене. Про цю тему можна почитати наступні матеріали: Thread'ом Java не зіпсуєш: Частина V — Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Поміж вказаних вище пулів потоків, є ще один. Можна сказати, що він трохи особливий. Ім'я йому — Work Stealing Pool. Якщо коротко, то Work Stealing — це такий алгоритм роботи, за якого потоки, що простоюють, починають забирати завдання інших потоків або завдання зі спільної черги. Подивимось на приклад:

public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Якщо ми запустимо цей код, то ExecutorService нам створить 5 потоків, тому що кожен потік буде вставати в wait чергу по локу об'єкта lock. Про монітори та локи по ньому ми вже раніше розбирали у "Thread'ом Java не зіпсуєш: Частина II — синхронізація". А тепер ми замінимо Executors.newCachedThreadPool на Executors.newWorkStealingPool(). Що зміниться? Ми побачимо, що наші завдання виконуються не в 5 потоків, а менше. Пам'ятайте, що cachedThreadPool створював для кожного завдання свій потік? Тому що wait блокував потік, а наступні завдання хотіли виконуватись і в пулі для них створювалися нові потоки. У випадку з StealingPool потоки не будуть вічно простоювати в wait, вони почнуть виконувати сусідні завдання. Чим так відрізняється від решти thread pool цей WorkStealingPool? Тим, що всередині нього живе насправді чарівний ForkJoinPool:

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Насправді є ще одна відмінність. Потоки, які створюються для ForkJoinPool за замовчуванням є демон потоками, на відміну від потоків, створених через звичайний ThreadPool. Загалом, варто пам'ятати про демон-потоки, тому що, наприклад, при CompletableFuture теж використовуються демон-потоки, якщо не вказати свою ThreadFactory, яка буде створювати не демон-потоки. Ось такі сюрпризи можуть чекати в несподіваному місці!)

Fork/Join Pool

У цій частині ми поговоримо про той самий ForkJoinPool (його ще називають fork/join framework), який живе "під капотом" у WorkStealingPool. Загалом, Fork Join Framework з'явився ще в Java 1.7. І хоча вже Java 11 на дворі, але згадати все одно варто. Не найпоширеніше завдання, але досить цікаве. На просторах мережі є гарний огляд на цю тему: "Fork/Join Framework в Java 7". Fork/JoinPool оперує в своїй роботі таким поняттям, як java.util.concurrent.RecursiveTask. Також є аналог — java.util.concurrent.RecursiveAction. RecursiveAction не повертають результат. Таким чином RecursiveTask схожий на Callable, а RecursiveAction схожий на Runnable. Ну і дивлячись на назву ми бачимо два ключові методи — fork і join. Метод fork запускає асинхронно в окремому потоці певне завдання. А метод join дозволяє дочекатися завершення виконання роботи. Існує кілька способів використання: Thread'ом Java не зіпсуєш: Частина V — Executor, ThreadPool, Fork Join - 5Ця картинка — це частина слайду доповіді Олексія Шипильова "Fork/Join: реалізація, використання, продуктивність". Щоб стало зрозуміліше, варто подивитися його доповідь на JEE CONF: "Fork Join особливості реалізації".

Підведення підсумків

Отже, ось ми і закінчили чергову частину огляду. Ми розібрались, що спочатку придумали Executor для виконання потоків. Потім вирішили продовжити ідею і придумали ExecutorService. ExecutorService дозволяє відправляти завдання на виконання за допомогою submit і invoke, а також керувати сервісом, вимикаючи його. Т.к. ExecutorService'у потрібні реалізації, написали клас з фабричними методами і назвали його Executors. Він дозволяє створювати пули потоків ThreadPoolExecutor'и. При цьому існують такі пули потоків, які дозволяють ще і вказати розклад для виконання, а за WorkStealingPool ховається ForkJoinPool. Сподіваюся, тобі було не тільки цікаво вище написане, але і зрозуміло ) Завжди радий пропозиціям і зауваженням.