JavaRush/Java блог/Random UA/Thread'ом Java не зіпсуєш: Частина V - Executor, ThreadPo...
Viacheslav
3 рівень

Thread'ом Java не зіпсуєш: Частина V - Executor, ThreadPool, Fork Join

Стаття з групи Random UA
учасників

Вступ

Отже, ми знаємо, що в Java є потоки, про що можна прочитати в огляді " Thread'ом Java не зіпсуєш: Частина I - потоки ". Thread'ом Java не зіпсуєш: Частина V - Executor, ThreadPool, Fork Join - 1Давайте ще раз подивимося на типовий код:
public static void main(String []args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Як ми бачимо, код для запуску завдання досить типовий, але на кожен новий запуск нам його доведеться повторювати. Одне з рішень - винести його в окремий метод, наприклад execute(Runnable runnable). Але розробники Java за нас вже потурбувалися і вигадали інтерфейс Executor:
public static void main(String []args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Як видно, код став лаконічнішим і дозволив нам просто написати код із запуску Runnableв потоці. Здорово, чи не так? Але це лише початок: Thread'ом Java не зіпсуєш: Частина V - Executor, ThreadPool, Fork Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Як видно, інтерфейс Executorмає інтерфейс-спадкоємець ExecutorService. У JavaDoc даного інтерфейсу сказано, що ExecutorServiceє описом особливого Executor'а, який надає методи зупинення роботи Executor'а і дозволяє отримати java.util.concurrent.Future, щоб відстежувати процес виконання. Раніше, в " Thread'ом Java не зіпсуєш: Частина IV - Callable, Future і друзі " ми вже коротко розглянули можливості Future. Хто забув чи не читав - раджу освіжити у пам'яті ;) Що ще цікавого в JavaDoc написано? Що у нас є спеціальна фабрика java.util.concurrent.Executors, яка дозволяє створювати доступні за замовчуванням реалізації ExecutorService.

ExecutorService

Ще раз згадаємо. У нас є Executorдля execute (тобто виконання) якогось завдання в потоці, коли реалізація створення потоку прихована від нас. У нас є ExecutorServiceособливий Executor, який має набір можливостей з управління ходом виконання. І у нас є фабрика Executors, яка дозволяє створювати ExecutorService. Давайте тепер це зробимо самі:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Як бачимо, ми вказали фіксований пул потоків ( Fixed Thread Pool) розміром 2. Після цього ми по черзі відправляємо в пул завдання. Кожне завдання повертає рядок ( String), який містить ім'я потоку ( currentThread().getName()). Важливо в самому кінці виконати shutdown для ExecutorService, тому що в іншому випадку наша програма не завершиться. У фабриці Executorsє інші фабричні методи. Наприклад, ми можемо створити пул всього з одного потоку newSingleThreadExecutorабо пул з кешуванням newCachedThreadPool, коли потоки будуть забиратися з пулу, якщо вони простоюють 1 хвабону. Насправді, за цими ExecutorServiceховається блокуюча черга , в яку поміщаються завдання і з якої виконуються ці завдання. Детальніше про блокуючі черги можна переглянути у відео "Блокуюча черга - Collections #5 - Advanced Java ". А також можна прочитати огляд " Блокуючі черги пакета concurrent " і відповідь на питання " When to prefer LinkedBlockingQueue over ArrayBlockingQueue? ". Супер спрощено - BlockingQueue(блокуюча черга) блокує потік, у двох випадках:
  • потік намагається отримати елементи з порожньої черги
  • потік намагається покласти елементи на повну чергу
Якщо подивитися на реалізацію фабричних методів, ми побачимо, як вони влаштовані. Наприклад:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
або
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Як бачимо, всередині фабричних методів створюються реалізації ExecutorService. І в основному це ThreadPoolExecutor. Змінюються лише атрибути, які впливають на роботу. Thread'ом Java не зіпсуєш: Частина V - Executor, ThreadPool, Fork Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Як ми раніше побачабо, усередині фабричних методів переважно створюється ThreadPoolExecutor. На функціональність впливає те, які значення передані як максимум і мінімум потоків, а також яка черга використовується. А використовуватись може будь-яка реалізація інтерфейсу java.util.concurrent.BlockingQueue. Говорячи про ThreadPoolExecutor'ах, варто відзначити цікаві особливості при роботі. Наприклад, не можна посилати завдання у ThreadPoolExecutor, якщо там немає місця:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Цей код впаде з помилкою:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Тобто taskне можна засабити, т.к. SynchronousQueueвлаштована так, що фактично складається з одного елемента та не дозволяє покласти туди більше. Як бачимо, queued tasksтут 0, й у немає нічого дивного, т.к. це специфіка SynchronousQueue- це черга в один елемент, яка завжди порожня. (!) Коли один потік кладе в чергу елемент, він чекатиме, доки інший потік не забере елемент із черги. Тому ми можемо замінити на new LinkedBlockingQueue<>(1)і в помилці зміниться те, що буде вказано queued tasks = 1. Т.к. Насамперед один елемент, то другий ми вже покласти не можемо. І впадемо на цьому. Продовжуючи тему черги, варто зазначити, що клас ThreadPoolExecutorмає додаткові методи обслуговування черги. Наприклад, методthreadPoolExecutor.purge()видалить із черги всі скасовані завдання, щоб звільнити місце у черзі. Ще однією цікавою функцією, пов'язаною з чергою, є обробник неприйнятих завдань:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Для прикладу обробник просто виводить слово Rejectedна кожну відмову приймати завдання в чергу. Зручно, чи не так? Крім того, ThreadPoolExecutorмає цікавого спадкоємця - ScheduledThreadPoolExecutor, Який є ScheduledExecutorService. Він надає можливість виконувати завдання за таймером.

ScheduledExecutorService

ExecutorServiceтипу ScheduledExecutorServiceдозволяють запускати завдання за розкладом (schedule). Подивимося на приклад:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Тут все просто. Вирушають завдання, отримуємо "заплановане завдання" java.util.concurrent.ScheduledFuture. З розкладом може бути корисним і наступний випадок:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Тут ми відправляємо Runnableзавдання на виконання з фіксованою частотою (Fixed Rate) із певною затримкою. У цьому випадку через 1 секунду кожні 2 секунди почати виконувати завдання. Є схожий варіант:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Але тут завдання виконуються із заданим проміжком МІЖ виконанням різних завдань. Тобто завдання taskбуде виконано за 1 секунду. Далі, коли вона буде завершена, пройде 2 секунди, і тоді нове завдання task буде запущено. На цю тему можна прочитати такі матеріали: Thread'ом Java не зіпсуєш: Частина V - Executor, ThreadPool, Fork Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Крім зазначених вище пулів потоків є ще один. Можна сказати, що він трохи особливий. Ім'я йому – Work Stealing Pool. Якщо коротко, то Work Stealing — це такий алгоритм роботи, при якому потоки, що простоюють, починають забирати завдання інших потоків або завдання із загальної черги. Подивимося на приклад:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Якщо ми запустимо цей код, то ExecutorServiceстворить 5 потоків, т.к. кожен потік буде вставати у wait чергу по локу об'єкта lock. Про монітори і локи по ньому ми вже раніше розбиралися в " Thread'ом Java не зіпсуєш: Частина II - синхронізація ". А тепер ми замінимо Executors.newCachedThreadPoolна Executors.newWorkStealingPool(). Що зміниться? Ми побачимо, що наші завдання виконуються не 5 потоків, а менше. Помнете, що cachedThreadPoolстворював на кожне завдання свій потік? Тому що waitблокував потік, а наступні завдання хотіли виконуватись і в пулі для них створювалися нові потоки. У випадку з StealingPoolпотоками не вічно простоюватимуть в wait, вони почнуть виконувати сусідні завдання. Чим так відрізняється від решти тредпулів цей WorkStealingPool? Тим, що в ньому живе насправді чарівнийForkJoinPool:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Насправді є ще одна відмінність. Потоки, які створюються за ForkJoinPoolумовчанням є демон потоками, на відміну від потоків, створених через звичайний ThreadPool. Взагалі варто пам'ятати про демон-потоки, т.к. наприклад при CompletableFutureтеж використовуються демон-потоки, якщо не вказати свою ThreadFactory, яка створюватиме не демон-потоки. Ось такі сюрпризи можуть чекати в несподіваному місці!)

Fork/Join Pool

У цій частині ми поговоримо про той ForkJoinPool(його ще називають fork/join framework), який живе "під капотом" у WorkStealingPool. Взагалі, Fork Join Framework з'явився ще Java 1.7. І нехай уже Java 11 надворі, але згадати все одно варто. Не найпоширеніша задача, але досить цікава. На просторах мережі є хороший огляд на цю тему: " Fork/Join Framework у Java 7 ". Fork/JoinPoolоперує у роботі таким поняттям як java.util.concurrent.RecursiveTask. Також є аналог - java.util.concurrent.RecursiveAction. RecursiveAction не повертає результату. У такий спосіб RecursiveTaskсхожий на Callable, а RecursiveActionсхожий на Runnable. Ну і дивлячись на назву ми бачимо два ключові методи - forkі join. Метод forkзапускає асинхронно в окремому потоці певне завдання. А методjoinдозволяє дочекатися завершення виконання роботи. Існує кілька способів використання: Thread'ом Java не зіпсуєш: Частина V - Executor, ThreadPool, Fork Join - 5Даний малюнок - це частина слайду доповіді Олексія Шипільова " Fork/Join: реалізація, використання, продуктивність ". Щоб стало зрозуміліше, варто подивитися його доповідь на JEE CONF: " Fork Join особливості реалізації ".

Підбиття підсумків

Отож ми й закінчабо чергову частину огляду. Ми розібралися, що спочатку вигадали Executorдля виконання потоків. Потім вирішабо продовжити ідею і вигадали ExecutorService. ExecutorServiceдозволяє відправляти завдання на виконання за допомогою submitта invoke, а також керувати сервісом, вимикаючи його. Т.к. ExecutorService'у потрібні реалізації, написали клас із фабричними методами та назвали його Executors. Він дозволяє створювати пули потоків ThreadPoolExecutor. При цьому існують такі пули потоків, які дозволяють ще й вказати розклад для виконання, а за WorkStealingPoolховається ForkJoinPool. Сподіваюся, Вам було не тільки цікаво вище написане, але й зрозуміло) Завжди радий пропозиціям та зауваженням. #Viacheslav
Коментарі
  • популярні
  • нові
  • старі
Щоб залишити коментар, потрібно ввійти в систему
Для цієї сторінки немає коментарів.