1. CountDownLatch: старт за сигналом
У багатопоточному світі часто потрібно налагодити злагоджену роботу групи потоків — щоб усі почали, закінчили або перейшли до наступного етапу разом. Наприклад:
Уявіть перегони. Автівки стоять на старті — хтось уже прогрів двигун, хтось ще перевіряє шини. Але поки суддя не махне прапором, ніхто не рушає. Ось це і є завдання координації.
Або інший приклад: ви готуєте вечерю з друзями — хтось ріже овочі, хтось ставить воду, хтось шукає, куди зникла сіль. Головне — щоб усі закінчили підготовку, перш ніж почати готувати.
Для таких випадків Java дає нам готові інструменти синхронізації — безпечні, зрозумілі й без болю від wait() і notify(). Один із найкорисніших — CountDownLatch. Він працює як лічильник-замок: доки не опуститься до нуля, «двері» зачинені, і ніхто не йде далі. А коли всі відмітилися — засувка відкривається, і потоки синхронно рвуться в бій.
CountDownLatch
CountDownLatch — це «одноразовий вентиль», який дозволяє одному або кільком потокам чекати, поки інші потоки завершать певну кількість операцій.
Це як старт марафону: усі бігуни стоять на лінії, чекають пострілу стартового пістолета. Щойно суддя вистрілив (лічильник дійшов до нуля) — усі біжать.
Як це взагалі працює
CountDownLatch — це як стартовий свисток для потоків. Під час створення ви задаєте число — наприклад, 3. Це як три сигнали, які потрібно отримати, перш ніж розпочнеться гонка.
Потоки, які мають чекати старту, викликають await(). Вони стоять на лінії та готові зірватися з місця, але поки що тримають гальма. Інші потоки, виконуючи підготовку, у міру готовності викликають countDown() — ніби подають сигнал: «Я готовий!»
Щойно лічильник доходить до нуля — бах! — усі потоки, що чекали, одночасно стартують.
Але запам’ятайте: CountDownLatch — річ одноразова. Після того як лічильник дійшов до нуля, назад його не повернути. Це не револьвер, а петарда: хлопнула — і все.
Приклад: очікування завершення N задач
import java.util.concurrent.CountDownLatch;
public class LatchDemo {
public static void main(String[] args) throws InterruptedException {
int workers = 3;
CountDownLatch latch = new CountDownLatch(workers);
for (int i = 1; i <= workers; i++) {
int id = i;
new Thread(() -> {
System.out.println("Працівник " + id + " розпочав роботу");
try { Thread.sleep(500 + id * 200); } catch (InterruptedException ignored) {}
System.out.println("Працівник " + id + " закінчив роботу");
latch.countDown(); // зменшуємо лічильник
}).start();
}
System.out.println("Головний потік чекає на завершення всіх працівників...");
latch.await(); // чекаємо, доки всі працівники завершать
System.out.println("Усі працівники завершили! Продовжуємо основну роботу.");
}
}
Виведення:
Головний потік чекає на завершення всіх працівників...
Працівник 1 розпочав роботу
Працівник 2 розпочав роботу
Працівник 3 розпочав роботу
Працівник 1 закінчив роботу
Працівник 2 закінчив роботу
Працівник 3 закінчив роботу
Усі працівники завершили! Продовжуємо основну роботу.
Приклад: одночасний старт «за сигналом»
CountDownLatch startSignal = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " очікує старту");
startSignal.await(); // чекаємо сигналу
System.out.println(Thread.currentThread().getName() + " стартує!");
} catch (InterruptedException ignored) {}
}).start();
}
Thread.sleep(1000);
System.out.println("Сигнал до старту!");
startSignal.countDown(); // усі потоки стартують одночасно
2. CyclicBarrier: багаторазові фази, бар’єрні дії
CyclicBarrier: зустрічаємося біля вогнища
CyclicBarrier — це місце зустрічі потоків. Кожен із них біжить своїм маршрутом, робить щось своє, а потім усі збираються біля «бар’єра» — немов біля вогнища у горах. Коли зібралися всі, бар’єр відкривається, група дружно йде далі.
Головна відмінність від CountDownLatch — цей бар’єр можна використовувати знову і знову. Після кожної спільної зупинки він «перезаряджається», і команда може продовжити шлях до наступного етапу.
Уявіть собі: Група туристів іде довгим маршрутом. Кожен рухається у своєму темпі: хтось фотографує метеликів, хтось шукає Wi‑Fi. Але на кожному перевалі вони зустрічаються біля вогнища, чекають одне одного і вирішують, куди йти далі. Ось це і є CyclicBarrier у дії.
Як це працює
Ви створюєте бар’єр і вказуєте, скільки учасників має зібратися, наприклад, 4. Кожен потік, дійшовши до контрольної точки, викликає await() — і чекає інших. Коли всі четверо зібралися, бар’єр «клацає» і відпускає всіх далі.
Можна навіть задати «бар’єрну дію» — шматочок коду, який виконається рівно один раз, коли група зібралася. Наприклад, розпалити те саме вогнище або записати лог: «Етап завершено, йдемо далі». Для цього в конструктор передають Runnable.
Важливо: на відміну від одноразового CountDownLatch, CyclicBarrier багаторазовий. Після кожного «збору» він знову готовий до наступного етапу — як вічне похідне вогнище, яке можна розпалювати знову і знову.
Приклад: синхронізація фаз
import java.util.concurrent.CyclicBarrier;
public class BarrierDemo {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("Усі підійшли до бар’єра! Починаємо нову фазу.");
});
for (int i = 1; i <= parties; i++) {
int id = i;
new Thread(() -> {
try {
System.out.println("Потік " + id + " працює у фазі 1");
Thread.sleep(300 + id * 200);
System.out.println("Потік " + id + " чекає на бар’єр");
barrier.await(); // чекаємо інших
System.out.println("Потік " + id + " працює у фазі 2");
Thread.sleep(200 + id * 100);
System.out.println("Потік " + id + " чекає на бар’єр (2)");
barrier.await(); // знову чекаємо
System.out.println("Потік " + id + " завершив роботу");
} catch (Exception e) {
System.out.println("Помилка: " + e);
}
}).start();
}
}
}
Виведення:
Потік 1 працює у фазі 1
Потік 2 працює у фазі 1
Потік 3 працює у фазі 1
Потік 1 чекає на бар’єр
Потік 2 чекає на бар’єр
Потік 3 чекає на бар’єр
Усі підійшли до бар’єра! Починаємо нову фазу.
Потік 1 працює у фазі 2
...
Бар’єрна дія
Можна передати в конструктор CyclicBarrier дію (Runnable), яка виконається один раз, коли всі потоки підійшли до бар’єра (наприклад, оновити стан, вивести лог).
Пастки: що, якщо один потік впав?
Якщо один із потоків кинув виняток або не дійшов до бар’єра, решта чекатимуть вічно — або отримають BrokenBarrierException. Бар’єр «ламається», і його потрібно створювати заново.
Ось як цей розділ можна переписати в більш живому, образному та розмовному стилі — щоб він звучав як природне продовження «оркестрової» лінії:
3. Phaser: вмілий диригент великого концерту
Phaser — це щось на кшталт «супербар’єра». Він поєднує в собі найкращі риси CountDownLatch і CyclicBarrier, але при цьому значно гнучкіший. Це як оркестр, де музиканти можуть приходити й іти між частинами концерту, а диригент усе одно стежить, щоб кожна частина почалася тоді, коли всі готові.
На відміну від звичайного бар’єра, Phaser уміє працювати етапами — фази змінюють одна одну. Хтось грає лише в першій частині, хтось підключається пізніше, а хтось іде раніше — усе це Phaser спокійно переживає.
Як це працює
Спочатку створюється Phaser, зазвичай із заданою кількістю учасників — parties. Кожен потік реєструється (register()), виконує свою партію і наприкінці фази викликає arriveAndAwaitAdvance() — повідомляє, що закінчив, і чекає інших. Коли всі дійшли до цього моменту, Phaser перемикається на наступну фазу, і процес повторюється.
Якщо учасник більше не потрібен — він може гарно «вклонитися» і піти зі сцени через arriveAndDeregister(). Нові, навпаки, можуть приєднатися прямо під час концерту — через register().
Коли Phaser кращий за Barrier
Phaser варто обрати, якщо ваша програма живе не в одному ритмі, а в кількох:
- кількість потоків змінюється на ходу,
- є кілька етапів, і не всі учасники зобов’язані брати участь у всіх,
- або просто хочеться максимальної гнучкості без зайвої мороки з ручною синхронізацією.
По суті, Phaser — це диригент. Він не тільки махає паличкою, а й уміє адаптуватися до складу оркестру, до кількості частин концерту і навіть до того, що хтось запізнився чи вийшов раніше.
Приклад: етапна обробка з динамічною кількістю потоків
import java.util.concurrent.Phaser;
public class PhaserDemo {
public static void main(String[] args) {
Phaser phaser = new Phaser(1); // головний потік
for (int i = 1; i <= 3; i++) {
phaser.register(); // реєструємо учасника
int id = i;
new Thread(() -> {
for (int phase = 1; phase <= 2; phase++) {
System.out.println("Потік " + id + " працює у фазі " + phase);
try { Thread.sleep(200 + id * 100); } catch (InterruptedException ignored) {}
phaser.arriveAndAwaitAdvance(); // чекаємо інших
}
System.out.println("Потік " + id + " завершив роботу");
phaser.arriveAndDeregister(); // виходимо з phaser
}).start();
}
// Головний потік теж бере участь у фазах
for (int phase = 1; phase <= 2; phase++) {
phaser.arriveAndAwaitAdvance();
System.out.println("Головний потік: фаза " + phase + " завершена");
}
phaser.arriveAndDeregister();
System.out.println("Усі фази завершено!");
}
}
Особливості:
- Можна додавати/видаляти учасників на льоту.
- Можна дізнатися номер поточної фази: phaser.getPhase().
- Можна завершити phaser: phaser.forceTermination().
4. Exchanger: обмін порціями даних між потоками
Exchanger<T> — це синхронізатор для обміну даними між двома потоками. Кожен потік викликає exchange(data), і коли обидва потоки зустрілися, вони обмінюються своїми даними.
Аналогія: Двоє кур’єрів зустрічаються на перехресті та обмінюються пакетами.
Як працює?
- Один потік викликає exchange(data1) — чекає на другого.
- Другий потік викликає exchange(data2) — обидва отримують дані одне одного.
- Якщо другий потік не прийшов — перший чекає (можна задати тайм-аут).
Приклад: обмін буферами між producer і consumer
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
// Producer
new Thread(() -> {
String data = "Дані від producer";
try {
System.out.println("Producer: надсилає дані");
String response = exchanger.exchange(data);
System.out.println("Producer: отримав відповідь: " + response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// Consumer
new Thread(() -> {
try {
String received = exchanger.exchange("Відповідь від consumer");
System.out.println("Consumer: отримав дані: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
Виведення:
Producer: надсилає дані
Consumer: отримав дані: Дані від producer
Producer: отримав відповідь: Відповідь від consumer
Застосування:
- Обмін буферами між потоками (наприклад, один читає з файла, інший пише у мережу).
- Синхронізація фаз між двома потоками.
5. Практика: паралельна конвеєрна обробка
Завдання: «ігровий тік» (фази)
Припустімо, у нас є кілька потоків, кожен із яких відповідає за свою частину ігрового світу (наприклад, фізика, AI, рендеринг). Усі мають синхронізуватися на кожному «тіку» (фазі), щоб не було розсинхронізації.
Рішення: Використовуємо CyclicBarrier або Phaser.
import java.util.concurrent.CyclicBarrier;
public class GameTickDemo {
public static void main(String[] args) {
int subsystems = 3;
CyclicBarrier barrier = new CyclicBarrier(subsystems, () -> {
System.out.println("Усі підсистеми завершили тік. Починаємо наступний.");
});
for (int i = 1; i <= subsystems; i++) {
int id = i;
new Thread(() -> {
for (int tick = 1; tick <= 5; tick++) {
System.out.println("Підсистема " + id + " працює у тіку " + tick);
try { Thread.sleep(100 + id * 50); } catch (InterruptedException ignored) {}
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
}
Завдання: «вентиль» для великої кількості воркерів
Припустімо, у нас є 100 потоків‑воркерів, які мають стартувати одночасно після підготовки (наприклад, навантажувальний тест).
Рішення: Використовуємо CountDownLatch.
import java.util.concurrent.CountDownLatch;
public class MassStartDemo {
public static void main(String[] args) throws InterruptedException {
int workers = 100;
CountDownLatch ready = new CountDownLatch(workers);
CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < workers; i++) {
new Thread(() -> {
System.out.println("Потік готовий до старту");
ready.countDown(); // сигналізуємо про готовність
try {
start.await(); // чекаємо загального сигналу
System.out.println("Потік стартує!");
} catch (InterruptedException ignored) {}
}).start();
}
ready.await(); // чекаємо, доки всі потоки будуть готові
System.out.println("Усі готові! СТАРТ!");
start.countDown(); // даємо сигнал до старту
}
}
6. Типові помилки під час роботи з синхронізаторами
Помилка № 1: Використання CountDownLatch як багаторазового бар’єра.
CountDownLatch — одноразовий! Після досягнення нуля його не можна «перезарядити». Для багаторазових фаз використовуйте CyclicBarrier або Phaser.
Помилка № 2: Не обробили винятки (InterruptedException, BrokenBarrierException).
Методи await() можуть кидати винятки — завжди обробляйте їх, інакше потік може «зависнути» або завершитися з помилкою. Стежте за InterruptedException і BrokenBarrierException.
Помилка № 3: Один із потоків не дійшов до бар’єра.
Якщо один потік «впав» або не викликав await(), інші чекатимуть вічно (або отримають BrokenBarrierException). Стежте за тим, щоб усі учасники доходили до бар’єра.
Помилка № 4: Забули deregister() у Phaser.
Якщо потік завершив роботу, але не викликав arriveAndDeregister(), Phaser чекатиме «мертвого» учасника. Завжди коректно видаляйте потоки з Phaser.
Помилка № 5: Використання Exchanger більш ніж для двох потоків.
Exchanger працює тільки для обміну між двома потоками. Якщо потоків більше — отримаєте deadlock.
Помилка № 6: Змішування різних синхронізаторів без розуміння їхньої роботи.
Не варто одночасно використовувати кілька різних бар’єрів/латчів для однієї і тієї ж групи потоків — це може призвести до плутанини та зависань.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ