JavaRush /Курсы /JAVA 25 SELF /Синхронизаторы высокого уровня

Синхронизаторы высокого уровня

JAVA 25 SELF
58 уровень , 2 лекция
Открыта

1. CountDownLatch: старт по сигналу

В многопоточном мире часто нужно устроить слаженную работу группы потоков — чтобы все начали, закончили или перешли к следующему этапу вместе. Например:

Представьте гонку. Машины стоят на старте — кто-то уже прогрел двигатель, кто-то ещё проверяет шины. Но пока судья не махнёт флагом, никто не трогается. Вот это и есть задача координации.

Или другой пример: вы готовите ужин с друзьями — кто-то режет овощи, кто-то ставит воду, кто-то ищет, где пропала соль. Главное, чтобы все закончили подготовку прежде, чем начать готовить.

Для таких случаев Java даёт нам готовые инструменты синхронизации — безопасные, понятные и без боли от wait() и notify(). Один из самых полезных — CountDownLatch. Он работает как счётчик-замок: пока не опустится до нуля, «дверь» закрыта, и никто не идёт дальше. А когда все отметились — latch открывается, и потоки синхронно рвутся в бой.

CountDownLatch

CountDownLatch — это «одноразовый вентиль», который позволяет одному или нескольким потокам ждать, пока другие потоки завершат определённое количество операций.

Это как старт марафона: все бегуны стоят на линии, ждут выстрела стартового пистолета. Как только судья выстрелил (countdown дошёл до нуля) — все бегут.

Как это вообще работает

CountDownLatch — это как стартовый свисток для потоков. При создании вы задаёте число — например, 3. Это как три сигнала, которые нужно получить, прежде чем начнётся гонка.

Потоки, которые должны ждать старта, вызывают await(). Они стоят на линии и готовы сорваться с места, но пока держат тормоза. Другие потоки, выполняя подготовку, по мере готовности вызывают countDown() — будто подают сигнал: «Я готов!».

Как только счётчик доходит до нуля — бах! — все ожидавшие потоки одновременно стартуют.

Но запомните: CountDownLatch — штука одноразовая. После того как счётчик дошёл до нуля, обратно его не вернуть. Это не револьвер, а петарда: хлопнула — и всё.

Пример: ожидание завершения N задач

import java.util.concurrent.CountDownLatch;

public class LatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int workers = 3;
        CountDownLatch latch = new CountDownLatch(workers);

        for (int i = 1; i <= workers; i++) {
            int id = i;
            new Thread(() -> {
                System.out.println("Работник " + id + " начал работу");
                try { Thread.sleep(500 + id * 200); } catch (InterruptedException ignored) {}
                System.out.println("Работник " + id + " закончил работу");
                latch.countDown(); // уменьшаем счётчик
            }).start();
        }

        System.out.println("Главный поток ждёт завершения всех работников...");
        latch.await(); // ждём, пока все работники закончат
        System.out.println("Все работники закончили! Продолжаем основную работу.");
    }
}

Вывод:

Главный поток ждёт завершения всех работников...
Работник 1 начал работу
Работник 2 начал работу
Работник 3 начал работу
Работник 1 закончил работу
Работник 2 закончил работу
Работник 3 закончил работу
Все работники закончили! Продолжаем основную работу.

Пример: одновременный старт «по сигналу»

CountDownLatch startSignal = new CountDownLatch(1);

for (int i = 0; i < 5; i++) {
    new Thread(() -> {
        try {
            System.out.println(Thread.currentThread().getName() + " ждёт старта");
            startSignal.await(); // ждём сигнала
            System.out.println(Thread.currentThread().getName() + " стартует!");
        } catch (InterruptedException ignored) {}
    }).start();
}

Thread.sleep(1000);
System.out.println("Сигнал к старту!");
startSignal.countDown(); // все потоки стартуют одновременно

2. CyclicBarrier: многократные фазы, барьерные действия

CyclicBarrier: встречаемся у костра

CyclicBarrier — это место встречи потоков. Каждый из них бежит по своему маршруту, делает что-то своё, а потом все собираются у «барьера» — словно у костра в горах. Когда собрались все, барьер открывается, группа дружно идёт дальше.

Главное отличие от CountDownLatch — этот барьер можно использовать снова и снова. После каждой совместной остановки он «перезаряжается», и команда может продолжить путь к следующему этапу.

Представьте: Группа туристов идёт по длинному маршруту. Каждый движется своим темпом: кто-то фотографирует бабочек, кто-то ищет Wi‑Fi. Но на каждом перевале они встречаются у костра, ждут друг друга и решают, куда идти дальше. Вот это и есть CyclicBarrier в действии.

Как это работает

Вы создаёте барьер и указываете, сколько участников должно собраться, например, 4. Каждый поток, дойдя до контрольной точки, вызывает await() — и ждёт остальных. Когда все четверо собрались, барьер «щёлкает» и отпускает всех дальше.

Можно даже задать «барьерное действие» — кусочек кода, который выполнится ровно один раз, когда группа собралась. Например, разжечь тот самый костёр или записать лог: «Этап завершён, идём дальше». Для этого в конструктор передают Runnable.

Важно: в отличие от одноразового CountDownLatch, CyclicBarrier многоразовый. После каждого «сбора» он снова готов к следующему этапу — как вечный походный костёр, который можно разжечь снова и снова.

Пример: синхронизация фаз

import java.util.concurrent.CyclicBarrier;

public class BarrierDemo {
    public static void main(String[] args) {
        int parties = 3;
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("Все подошли к барьеру! Начинаем новую фазу.");
        });

        for (int i = 1; i <= parties; i++) {
            int id = i;
            new Thread(() -> {
                try {
                    System.out.println("Поток " + id + " работает в фазе 1");
                    Thread.sleep(300 + id * 200);
                    System.out.println("Поток " + id + " ждёт барьер");
                    barrier.await(); // ждём остальных

                    System.out.println("Поток " + id + " работает в фазе 2");
                    Thread.sleep(200 + id * 100);
                    System.out.println("Поток " + id + " ждёт барьер (2)");
                    barrier.await(); // снова ждём

                    System.out.println("Поток " + id + " завершил работу");
                } catch (Exception e) {
                    System.out.println("Ошибка: " + e);
                }
            }).start();
        }
    }
}

Вывод:

Поток 1 работает в фазе 1
Поток 2 работает в фазе 1
Поток 3 работает в фазе 1
Поток 1 ждёт барьер
Поток 2 ждёт барьер
Поток 3 ждёт барьер
Все подошли к барьеру! Начинаем новую фазу.
Поток 1 работает в фазе 2
...

Барьерное действие

Можно передать в конструктор CyclicBarrier действие (Runnable), которое выполнится один раз, когда все потоки подошли к барьеру (например, обновить состояние, вывести лог).

Ловушки: что если один поток упал?

Если один из потоков выбросил исключение или не дошёл до барьера, остальные будут ждать вечно — или получат BrokenBarrierException. Барьер «ломается», и его нужно пересоздавать.

Вот как этот раздел можно переписать в более живом, образном и разговорном стиле — чтобы он звучал как естественное продолжение «оркестровой» линии:

3. Phaser: умелый дирижёр большого концерта

Phaser — это что-то вроде «супербарьера». Он объединяет в себе лучшие черты CountDownLatch и CyclicBarrier, но при этом куда гибче. Это как оркестр, где музыканты могут приходить и уходить между частями концерта, а дирижёр всё равно следит, чтобы каждая часть началась тогда, когда все готовы.

В отличие от обычного барьера, Phaser умеет работать по этапам — фазы сменяются одна за другой. Кто-то играет только в первой части, кто-то подключается позже, а кто-то уходит раньше — всё это Phaser переживает спокойно.

Как это работает

Сначала создаётся Phaser, обычно с заданным количеством участников — parties. Каждый поток регистрируется (register()), выполняет свою партию и в конце фазы вызывает arriveAndAwaitAdvance() — сообщает, что закончил, и ждёт остальных. Когда все дошли до этого момента, Phaser переключается на следующую фазу, и процесс повторяется.

Если участник больше не нужен — он может красиво «поклониться» и уйти со сцены через arriveAndDeregister(). Новые, наоборот, могут присоединиться прямо во время концерта — через register().

Когда Phaser лучше, чем Barrier

Phaser стоит выбрать, если ваша программа живёт не в одном ритме, а в нескольких:

  • количество потоков меняется на ходу,
  • есть несколько этапов, и не все участники обязаны участвовать во всех,
  • или просто хочется максимальной гибкости без лишней возни с ручной синхронизацией.

По сути, Phaser — это дирижёр. Он не только машет палочкой, но и умеет адаптироваться к составу оркестра, к числу частей концерта и даже к тому, что кто-то опоздал или вышел пораньше.

Пример: этапная обработка с динамическим числом потоков

import java.util.concurrent.Phaser;

public class PhaserDemo {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(1); // главный поток

        for (int i = 1; i <= 3; i++) {
            phaser.register(); // регистрируем участника
            int id = i;
            new Thread(() -> {
                for (int phase = 1; phase <= 2; phase++) {
                    System.out.println("Поток " + id + " работает в фазе " + phase);
                    try { Thread.sleep(200 + id * 100); } catch (InterruptedException ignored) {}
                    phaser.arriveAndAwaitAdvance(); // ждём остальных
                }
                System.out.println("Поток " + id + " завершил работу");
                phaser.arriveAndDeregister(); // выходим из phaser
            }).start();
        }

        // Главный поток тоже участвует в фазах
        for (int phase = 1; phase <= 2; phase++) {
            phaser.arriveAndAwaitAdvance();
            System.out.println("Главный поток: завершена фаза " + phase);
        }
        phaser.arriveAndDeregister();
        System.out.println("Все фазы завершены!");
    }
}

Особенности:

  • Можно добавлять/удалять участников на лету.
  • Можно узнать номер текущей фазы: phaser.getPhase().
  • Можно завершить phaser: phaser.forceTermination().

4. Exchanger: обмен порциями данных между потоками

Exchanger<T> — это синхронизатор для обмена данными между двумя потоками. Каждый поток вызывает exchange(data), и когда оба потока встретились, они обмениваются своими данными.

Аналогия: Два курьера встречаются на перекрёстке и обмениваются пакетами.

Как работает?

  • Один поток вызывает exchange(data1) — ждёт второго.
  • Второй поток вызывает exchange(data2) — оба получают данные друг друга.
  • Если второй поток не пришёл — первый ждёт (можно задать тайм‑аут).

Пример: обмен буферами между producer и consumer

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        // Producer
        new Thread(() -> {
            String data = "Данные от producer";
            try {
                System.out.println("Producer: отправляет данные");
                String response = exchanger.exchange(data);
                System.out.println("Producer: получил ответ: " + response);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // Consumer
        new Thread(() -> {
            try {
                String received = exchanger.exchange("Ответ от consumer");
                System.out.println("Consumer: получил данные: " + received);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Вывод:

Producer: отправляет данные
Consumer: получил данные: Данные от producer
Producer: получил ответ: Ответ от consumer

Применение:

  • Обмен буферами между потоками (например, один читает из файла, другой пишет в сеть).
  • Синхронизация фаз между двумя потоками.

5. Практика: параллельная конвейерная обработка

Задача: игровой «тик» (фазы)

Допустим, у нас есть несколько потоков, каждый из которых отвечает за свою часть игрового мира (например, физика, AI, рендеринг). Все должны синхронизироваться на каждом «тике» (фазе), чтобы не было рассинхронизации.

Решение: Используем CyclicBarrier или Phaser.

import java.util.concurrent.CyclicBarrier;

public class GameTickDemo {
    public static void main(String[] args) {
        int subsystems = 3;
        CyclicBarrier barrier = new CyclicBarrier(subsystems, () -> {
            System.out.println("Все подсистемы завершили тик. Начинаем следующий.");
        });

        for (int i = 1; i <= subsystems; i++) {
            int id = i;
            new Thread(() -> {
                for (int tick = 1; tick <= 5; tick++) {
                    System.out.println("Подсистема " + id + " работает в тике " + tick);
                    try { Thread.sleep(100 + id * 50); } catch (InterruptedException ignored) {}
                    try {
                        barrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

Задача: «Вентиль» для большого числа воркеров

Допустим, у нас есть 100 потоков‑воркеров, которые должны стартовать одновременно после подготовки (например, тест нагрузки).

Решение: Используем CountDownLatch.

import java.util.concurrent.CountDownLatch;

public class MassStartDemo {
    public static void main(String[] args) throws InterruptedException {
        int workers = 100;
        CountDownLatch ready = new CountDownLatch(workers);
        CountDownLatch start = new CountDownLatch(1);

        for (int i = 0; i < workers; i++) {
            new Thread(() -> {
                System.out.println("Поток готов к старту");
                ready.countDown(); // сигнализируем о готовности
                try {
                    start.await(); // ждём общего сигнала
                    System.out.println("Поток стартует!");
                } catch (InterruptedException ignored) {}
            }).start();
        }

        ready.await(); // ждём, пока все потоки будут готовы
        System.out.println("Все готовы! СТАРТ!");
        start.countDown(); // даём сигнал к старту
    }
}

6. Типичные ошибки при работе с синхронизаторами

Ошибка №1: Использование CountDownLatch как многоразового барьера.
CountDownLatch — одноразовый! После достижения нуля его нельзя «перезарядить». Для многоразовых фаз используйте CyclicBarrier или Phaser.

Ошибка №2: Не обработали исключения (InterruptedException, BrokenBarrierException).
Методы await() могут выбрасывать исключения — всегда обрабатывайте их, иначе поток может «зависнуть» или завершиться с ошибкой. Следите за InterruptedException и BrokenBarrierException.

Ошибка №3: Один из потоков не дошёл до барьера.
Если один поток «упал» или не вызвал await(), остальные будут ждать вечно (или получат BrokenBarrierException). Следите за тем, чтобы все участники доходили до барьера.

Ошибка №4: Забыли deregister() в Phaser.
Если поток завершил работу, но не вызвал arriveAndDeregister(), Phaser будет ждать «мертвого» участника. Всегда корректно удаляйте потоки из Phaser.

Ошибка №5: Использование Exchanger более чем для двух потоков.
Exchanger работает только для обмена между двумя потоками. Если потоков больше — получите deadlock.

Ошибка №6: Смешивание разных синхронизаторов без понимания их работы.
Не стоит одновременно использовать несколько разных барьеров/латчей для одной и той же группы потоков — это может привести к путанице и зависаниям.

1
Задача
JAVA 25 SELF, 58 уровень, 2 лекция
Недоступна
Симфония Потоков: Театральная Премьера 🎭
Симфония Потоков: Театральная Премьера 🎭
1
Задача
JAVA 25 SELF, 58 уровень, 2 лекция
Недоступна
Секретный Обмен: Операция "Мгновенный Курьер" 🕵️
Секретный Обмен: Операция "Мгновенный Курьер" 🕵️
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ