Вступ
Потоки – штука цікава. У попередніх оглядах ми розглянули деякі доступні засоби реалізації багатопоточності. Погляньмо, що ми можемо зробити ще цікавого. На цей момент ми багато що знаємо. Наприклад, з "
Thread'ом Java не зіпсуєш: Частина I - потоки " ми знаємо, що потік - це Thread. Ми знаємо, що потік виконує певне завдання. Якщо хочемо, щоб наше завдання могли запустити (
run
), ми повинні вказати потоку якийсь
Runnable
.
Щоб згадати, можемо скористатися
Tutorialspoint Java Online Compiler 'ом:
public static void main(String []args){
Runnable task = () -> {
Thread thread = Thread.currentThread();
System.out.println("Hello from " + thread.getName());
};
Thread thread = new Thread(task);
thread.start();
}
Також ми знаємо про те, що у нас є таке поняття, як лок. Про нього ми читали в "
Thread'ом Java не зіпсуєш: Частина II - синхронізація ". Потік може займати лок і тоді інший потік, який спробує зайняти лок, буде змушений чекати на звільнення локу:
import java.util.concurrent.locks.*;
public class HelloWorld{
public static void main(String []args){
Lock lock = new ReentrantLock();
Runnable task = () -> {
lock.lock();
Thread thread = Thread.currentThread();
System.out.println("Hello from " + thread.getName());
lock.unlock();
};
Thread thread = new Thread(task);
thread.start();
}
}
Думаю, настав час поговорити про те, що ми ще можемо цікаве зробити.
Семафори
Найпростіший засіб контролю над тим, скільки потоків можуть одночасно працювати — семафор. Як на залізниці. Горить зелений – можна. Горить червоний – чекаємо. Що ми чекаємо на семафор? Дозволи. Дозвіл англійською - permit. Щоб отримати дозвіл - його потрібно отримати, що англійською буде acquire. А коли дозвіл більше не потрібно ми його повинні віддати, тобто звільнити його або позбудеться його, що англійською буде release. Подивимося, як це працює. Нам буде потрібно імпорт класу
java.util.concurrent.Semaphore
. Приклад:
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(0);
Runnable task = () -> {
try {
semaphore.acquire();
System.out.println("Finished");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
new Thread(task).start();
Thread.sleep(5000);
semaphore.release(1);
}
Як бачимо, запам'ятавши англійські слова, ми розуміємо як працює семафор. Цікаво, що головна умова - на "рахунку" семафору має бути позитивна кількість permit'ів. Тому ініціювати його можна і з мінусом. І вимагати (acquire) можна більше, ніж 1.
CountDownLatch
Наступний механізм -
CountDownLatch
. CountDown англійською – це відлік, а Latch – засувка або клямка. Тобто якщо перекладати, це засувка з відліком. Тут нам знадобиться відповідний імпорт класу
java.util.concurrent.CountDownLatch
. Це схоже на біги чи гонки, коли всі збираються біля стартової лінії і коли всі готові – дають дозвіл, і всі одночасно стартують. Приклад:
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3);
Runnable task = () -> {
try {
countDownLatch.countDown();
System.out.println("Countdown: " + countDownLatch.getCount());
countDownLatch.await();
System.out.println("Finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 3; i++) {
new Thread(task).start();
}
}
await англійською - очікувати. Тобто ми спочатку говоримо
countDown
. Як каже ґуґл перекладач, count down - "an act of counting numerals in reverse order to zero", тобто виконати дію за зворотним відліком, мета якого - дорахувати до нуля. А далі говоримо
await
— тобто чекати, доки значення лічильника не стане нульовим. Цікаво, що такий лічильник є одноразовим. Як сказано в JavaDoc - "When threads must repeatedly count down in this way, instead use a CyclicBarrier", тобто якщо потрібен багаторазовий рахунок - треба використовувати інший варіант, який називається
CyclicBarrier
.
CyclicBarrier
Як і випливає з назви,
CyclicBarrier
це циклічний бар'єр. Нам знадобиться імпорт класу
java.util.concurrent.CyclicBarrier
. Подивимося на приклад:
public static void main(String[] args) throws InterruptedException {
Runnable action = () -> System.out.println("На старт!");
CyclicBarrier berrier = new CyclicBarrier(3, action);
Runnable task = () -> {
try {
berrier.await();
System.out.println("Finished");
} catch (BrokenBarrierException | InterruptedException e) {
e.printStackTrace();
}
};
System.out.println("Limit: " + berrier.getParties());
for (int i = 0; i < 3; i++) {
new Thread(task).start();
}
}
Як бачимо, потік виконує
await
, тобто чекає. У цьому зменшується значення бар'єру. Бар'єр вважається зламаним (
berrier.isBroken()
), коли відлік дійшов до нуля. Щоб скинути бар'єр, потрібно викликати
berrier.reset()
, чого не вистачало
CountDownLatch
.
Exchanger
Наступний засіб -
Exchanger
. Exchange з англійської перекладається як обмін або обмінюватися. А
Exchanger
обмінник, тобто те, через що обмінюються. Подивимося на найпростіший приклад:
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Runnable task = () -> {
try {
Thread thread = Thread.currentThread();
String withThreadName = exchanger.exchange(thread.getName());
System.out.println(thread.getName() + " обменялся с " + withThreadName);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
new Thread(task).start();
new Thread(task).start();
}
Тут ми запускаємо два потоки. Кожен з них виконує метод exchange і чекає, коли інший потік так само виконає метод exchange. Отже, потоки обміняються між собою переданими аргументами. Цікава річ. Чи вона вам нічого не нагадує? А нагадує він
SynchronousQueue
, яка лежить в основі
cachedThreadPool
'а. Для наочності приклад:
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<String> queue = new SynchronousQueue<>();
Runnable task = () -> {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
};
new Thread(task).start();
queue.put("Message");
}
У прикладі видно, що запустивши новий потік, цей потік піде в очікування, т.к. у черзі буде порожньо. А далі
main
потік покладе на чергу текст "Message". При цьому він сам зупиниться на час, який потрібно, доки не отримають із черги цей текстовий елемент. З цієї теми також можна почитати "
SynchronousQueue Vs Exchanger ".
Phaser
І насамкінець найсолодше —
Phaser
. Нам знадобиться імпорт класу
java.util.concurrent.Phaser
. Подивимося на простий приклад:
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser();
phaser.register();
System.out.println("Phasecount is " + phaser.getPhase());
testPhaser(phaser);
testPhaser(phaser);
testPhaser(phaser);
Thread.sleep(3000);
phaser.arriveAndDeregister();
System.out.println("Phasecount is " + phaser.getPhase());
}
private static void testPhaser(final Phaser phaser) {
phaser.register();
new Thread(() -> {
String name = Thread.currentThread().getName();
System.out.println(name + " arrived");
phaser.arriveAndAwaitAdvance();
System.out.println(name + " after passing barrier");
}).start();
}
З прикладу видно, що бар'єр при використанні
Phaser
проривається, коли кількість реєстрацій збігається з кількістю прибулих до бар'єру. Детальніше можна ознайомитися з
Phaser
'ом у статті з хабра "
Новий синхронізатор Phaser ".
Підсумки
Як очевидно з прикладів, існують різні способи синхронізації потоків. Раніше я постарався вже згадати щось із багатопоточності, сподіваюся, минулі частини були корисні. Кажуть, що шлях до багатопоточності починається з книги Java Concurrency in Practice. Хоча вона вийшла у 2006 році, люди відповідають, що книга є досить фундаментальною і досі тримає удар. Наприклад, можна прочитати обговорення тут: "
Is Java Concurrency In Practice still valid? ". Також корисно прочитати посилання із обговорення. Наприклад, там є посилання на книгу "
The Well-Grounded Java Developer ", в якій варто звернутися на "
Chapter 4. Modern concurrency ". Є ще цілий огляд на цю тему: "
Там також є поради з приводу того, що ще слід почитати, щоб дійсно зрозуміти цю тему. Після цього, можна придивитися до такої чудової книги, як OCA OCP JavaSE 8 Programmer Practice Tests
. І там є тести в "∫". У цій книзі є як питання, так і відповіді з поясненням. Наприклад:
Багато хто може почати говорити, що це чергове заучування методів. З одного боку - так. З іншого боку, на це питання можна дати відповідь, згадавши, що
ExecutorService
- це свого роду "апгрейд"
Executor
'а. А
Executor
покликаний просто приховати спосіб створення потоків, але не основний спосіб їх виконання, тобто запуск
Runnable
у
execute(Callable)
новому
ExecutorService
потоці
Executor
.у просто додали методи
submit
, які вміють повертати
Future
. Як бачите, ми можемо і завчити список методів, але набагато простіше здогадатися, знаючи природу самих класів. Ну і трохи додаткових матеріалів на тему:
#Viacheslav
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ