Вступ
Отже, ми знаємо, що в Java є потоки, про що можна прочитати в огляді " Thread'ом Java не зіпсуєш: Частина I - потоки ". Давайте ще раз подивимося на типовий код:public static void main(String []args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Як ми бачимо, код для запуску завдання досить типовий, але на кожен новий запуск нам його доведеться повторювати. Одне з рішень - винести його в окремий метод, наприклад execute(Runnable runnable)
. Але розробники Java за нас вже потурбувалися і вигадали інтерфейс Executor
:
public static void main(String []args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Як видно, код став лаконічнішим і дозволив нам просто написати код із запуску Runnable
в потоці. Здорово, чи не так? Але це лише початок:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
має інтерфейс-спадкоємець ExecutorService
. У JavaDoc даного інтерфейсу сказано, що ExecutorService
є описом особливого Executor
'а, який надає методи зупинення роботи Executor
'а і дозволяє отримати java.util.concurrent.Future
, щоб відстежувати процес виконання. Раніше, в " Thread'ом Java не зіпсуєш: Частина IV - Callable, Future і друзі " ми вже коротко розглянули можливості Future
. Хто забув чи не читав - раджу освіжити у пам'яті ;) Що ще цікавого в JavaDoc написано? Що у нас є спеціальна фабрика java.util.concurrent.Executors
, яка дозволяє створювати доступні за замовчуванням реалізації ExecutorService
.
ExecutorService
Ще раз згадаємо. У нас єExecutor
для execute (тобто виконання) якогось завдання в потоці, коли реалізація створення потоку прихована від нас. У нас є ExecutorService
особливий Executor
, який має набір можливостей з управління ходом виконання. І у нас є фабрика Executors
, яка дозволяє створювати ExecutorService
. Давайте тепер це зробимо самі:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Як бачимо, ми вказали фіксований пул потоків ( Fixed Thread Pool
) розміром 2. Після цього ми по черзі відправляємо в пул завдання. Кожне завдання повертає рядок ( String
), який містить ім'я потоку ( currentThread().getName()
). Важливо в самому кінці виконати shutdown для ExecutorService
, тому що в іншому випадку наша програма не завершиться. У фабриці Executors
є інші фабричні методи. Наприклад, ми можемо створити пул всього з одного потоку newSingleThreadExecutor
або пул з кешуванням newCachedThreadPool
, коли потоки будуть забиратися з пулу, якщо вони простоюють 1 хвабону. Насправді, за цими ExecutorService
ховається блокуюча черга , в яку поміщаються завдання і з якої виконуються ці завдання. Детальніше про блокуючі черги можна переглянути у відео "Блокуюча черга - Collections #5 - Advanced Java ". А також можна прочитати огляд " Блокуючі черги пакета concurrent " і відповідь на питання " When to prefer LinkedBlockingQueue over ArrayBlockingQueue? ". Супер спрощено - BlockingQueue
(блокуюча черга) блокує потік, у двох випадках:
- потік намагається отримати елементи з порожньої черги
- потік намагається покласти елементи на повну чергу
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
або
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Як бачимо, всередині фабричних методів створюються реалізації ExecutorService
. І в основному це ThreadPoolExecutor
. Змінюються лише атрибути, які впливають на роботу.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Як ми раніше побачабо, усередині фабричних методів переважно створюєтьсяThreadPoolExecutor
. На функціональність впливає те, які значення передані як максимум і мінімум потоків, а також яка черга використовується. А використовуватись може будь-яка реалізація інтерфейсу java.util.concurrent.BlockingQueue
. Говорячи про ThreadPoolExecutor
'ах, варто відзначити цікаві особливості при роботі. Наприклад, не можна посилати завдання у ThreadPoolExecutor
, якщо там немає місця:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Цей код впаде з помилкою:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Тобто task
не можна засабити, т.к. SynchronousQueue
влаштована так, що фактично складається з одного елемента та не дозволяє покласти туди більше. Як бачимо, queued tasks
тут 0, й у немає нічого дивного, т.к. це специфіка SynchronousQueue
- це черга в один елемент, яка завжди порожня. (!) Коли один потік кладе в чергу елемент, він чекатиме, доки інший потік не забере елемент із черги. Тому ми можемо замінити на new LinkedBlockingQueue<>(1)
і в помилці зміниться те, що буде вказано queued tasks = 1
. Т.к. Насамперед один елемент, то другий ми вже покласти не можемо. І впадемо на цьому. Продовжуючи тему черги, варто зазначити, що клас ThreadPoolExecutor
має додаткові методи обслуговування черги. Наприклад, методthreadPoolExecutor.purge()
видалить із черги всі скасовані завдання, щоб звільнити місце у черзі. Ще однією цікавою функцією, пов'язаною з чергою, є обробник неприйнятих завдань:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Для прикладу обробник просто виводить слово Rejected
на кожну відмову приймати завдання в чергу. Зручно, чи не так? Крім того, ThreadPoolExecutor
має цікавого спадкоємця - ScheduledThreadPoolExecutor
, Який є ScheduledExecutorService
. Він надає можливість виконувати завдання за таймером.
ScheduledExecutorService
ExecutorService
типу ScheduledExecutorService
дозволяють запускати завдання за розкладом (schedule). Подивимося на приклад:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Тут все просто. Вирушають завдання, отримуємо "заплановане завдання" java.util.concurrent.ScheduledFuture
. З розкладом може бути корисним і наступний випадок:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Тут ми відправляємо Runnable
завдання на виконання з фіксованою частотою (Fixed Rate) із певною затримкою. У цьому випадку через 1 секунду кожні 2 секунди почати виконувати завдання. Є схожий варіант:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Але тут завдання виконуються із заданим проміжком МІЖ виконанням різних завдань. Тобто завдання task
буде виконано за 1 секунду. Далі, коли вона буде завершена, пройде 2 секунди, і тоді нове завдання task буде запущено. На цю тему можна прочитати такі матеріали:
- An introduction to thread pools
- Introduction to Thread Pools
- Java Multithreading Steeplechase: Cancelling Tasks In Executors
- Picking correct Java executors for background tasks
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Крім зазначених вище пулів потоків є ще один. Можна сказати, що він трохи особливий. Ім'я йому – Work Stealing Pool. Якщо коротко, то Work Stealing — це такий алгоритм роботи, при якому потоки, що простоюють, починають забирати завдання інших потоків або завдання із загальної черги. Подивимося на приклад:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Якщо ми запустимо цей код, то ExecutorService
створить 5 потоків, т.к. кожен потік буде вставати у wait чергу по локу об'єкта lock
. Про монітори і локи по ньому ми вже раніше розбиралися в " Thread'ом Java не зіпсуєш: Частина II - синхронізація ". А тепер ми замінимо Executors.newCachedThreadPool
на Executors.newWorkStealingPool()
. Що зміниться? Ми побачимо, що наші завдання виконуються не 5 потоків, а менше. Помнете, що cachedThreadPool
створював на кожне завдання свій потік? Тому що wait
блокував потік, а наступні завдання хотіли виконуватись і в пулі для них створювалися нові потоки. У випадку з StealingPool
потоками не вічно простоюватимуть в wait
, вони почнуть виконувати сусідні завдання. Чим так відрізняється від решти тредпулів цей WorkStealingPool
? Тим, що в ньому живе насправді чарівнийForkJoinPool
:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Насправді є ще одна відмінність. Потоки, які створюються за ForkJoinPool
умовчанням є демон потоками, на відміну від потоків, створених через звичайний ThreadPool
. Взагалі варто пам'ятати про демон-потоки, т.к. наприклад при CompletableFuture
теж використовуються демон-потоки, якщо не вказати свою ThreadFactory
, яка створюватиме не демон-потоки. Ось такі сюрпризи можуть чекати в несподіваному місці!)
Fork/Join Pool
У цій частині ми поговоримо про тойForkJoinPool
(його ще називають fork/join framework), який живе "під капотом" у WorkStealingPool
. Взагалі, Fork Join Framework з'явився ще Java 1.7. І нехай уже Java 11 надворі, але згадати все одно варто. Не найпоширеніша задача, але досить цікава. На просторах мережі є хороший огляд на цю тему: " Fork/Join Framework у Java 7 ". Fork/JoinPool
оперує у роботі таким поняттям як java.util.concurrent.RecursiveTask
. Також є аналог - java.util.concurrent.RecursiveAction
. RecursiveAction не повертає результату. У такий спосіб RecursiveTask
схожий на Callable
, а RecursiveAction
схожий на Runnable
. Ну і дивлячись на назву ми бачимо два ключові методи - fork
і join
. Метод fork
запускає асинхронно в окремому потоці певне завдання. А методjoin
дозволяє дочекатися завершення виконання роботи. Існує кілька способів використання: Даний малюнок - це частина слайду доповіді Олексія Шипільова " Fork/Join: реалізація, використання, продуктивність ". Щоб стало зрозуміліше, варто подивитися його доповідь на JEE CONF: " Fork Join особливості реалізації ".
Підбиття підсумків
Отож ми й закінчабо чергову частину огляду. Ми розібралися, що спочатку вигадалиExecutor
для виконання потоків. Потім вирішабо продовжити ідею і вигадали ExecutorService
. ExecutorService
дозволяє відправляти завдання на виконання за допомогою submit
та invoke
, а також керувати сервісом, вимикаючи його. Т.к. ExecutorService
'у потрібні реалізації, написали клас із фабричними методами та назвали його Executors
. Він дозволяє створювати пули потоків ThreadPoolExecutor
. При цьому існують такі пули потоків, які дозволяють ще й вказати розклад для виконання, а за WorkStealingPool
ховається ForkJoinPool
. Сподіваюся, Вам було не тільки цікаво вище написане, але й зрозуміло) Завжди радий пропозиціям та зауваженням. #Viacheslav
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ