JavaRush/Java блог/Random/Thread'ом Java не испортишь: Часть VI — К барьеру!
Viacheslav
3 уровень

Thread'ом Java не испортишь: Часть VI — К барьеру!

Статья из группы Random
участников

Вступление

Потоки — штука интересная. В прошлых обзорах мы рассмотрели некоторые доступные средства для реализации многопоточности. Давайте посмотрим, что мы можем сделать ещё интересного. К этому моменту мы многое что знаем. Например, из "Thread'ом Java не испортишь: Часть I — потоки" мы знаем, что поток — это Thread. Мы знаем, что поток выполняет некоторую задачу. Если мы хотим, чтобы нашу задачи могли запустить (run), то мы должны указать потоку некий Runnable. Thread'ом Java не испортишь: Часть VI — К барьеру! - 1Чтобы вспомнить, можем воспользоваться Tutorialspoint Java Online Compiler'ом:
public static void main(String []args){
	Runnable task = () -> {
 		Thread thread = Thread.currentThread();
		System.out.println("Hello from " + thread.getName());
	};
	Thread thread = new Thread(task);
	thread.start();
}
Так же мы знаем о том, что у нас есть такое понятие, как лок. О нем мы читали в "Thread'ом Java не испортишь: Часть II — синхронизация". Поток может занимать лок и тогда другой поток, который попытается занять лок, будет вынужден ждать освобождения лока:
import java.util.concurrent.locks.*;

public class HelloWorld{
	public static void main(String []args){
		Lock lock = new ReentrantLock();
		Runnable task = () -> {
			lock.lock();
			Thread thread = Thread.currentThread();
			System.out.println("Hello from " + thread.getName());
			lock.unlock();
		};
		Thread thread = new Thread(task);
		thread.start();
	}
}
Думаю, пора поговорить о том, что мы ещё можем интересное сделать.

Семафоры

Самое простое средство контроля за тем, сколько потоков могут одновременно работать — семафор. Как на железной дороге. Горит зелёный — можно. Горит красный — ждём. Что мы ждём от семафора? Разрешения. Разрешение на английском — permit. Чтобы получить разрешение — его нужно получить, что на английском будет acquire. А когда разрешение больше не нужно мы его должны отдать, то есть освободить его или избавится от него, что на английском будет release. Посмотрим, как это работает. Нам потребуется импорт класса java.util.concurrent.Semaphore. Пример:
public static void main(String[] args) throws InterruptedException {
	Semaphore semaphore = new Semaphore(0);
	Runnable task = () -> {
		try {
			semaphore.acquire();
			System.out.println("Finished");
			semaphore.release();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	new Thread(task).start();
	Thread.sleep(5000);
	semaphore.release(1);
}
Как видим, запомнив английские слова, мы понимаем, как работает семафор. Интересно, что главное условие — на "счету" семафора должен быть положительное количество permit'ов. Поэтому, инициировать его можно и с минусом. И запрашивать (acquire) можно больше, чем 1.

CountDownLatch

Следующий механизм — CountDownLatch. CountDown на английском — это отсчёт, а Latch — задвижка или защёлка. То есть если переводить, то это защёлка с отсчётом. Тут нам понадобится соответствующий импорт класса java.util.concurrent.CountDownLatch. Это похоже на бега или гонки, когда все собираются у стартовой линии и когда все готовы — дают разрешение, и все одновременно стартуют. Пример:
public static void main(String[] args) {
	CountDownLatch countDownLatch = new CountDownLatch(3);
	Runnable task = () -> {
		try {
			countDownLatch.countDown();
			System.out.println("Countdown: " + countDownLatch.getCount());
			countDownLatch.await();
			System.out.println("Finished");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	for (int i = 0; i < 3; i++) {
		new Thread(task).start();
 	}
}
await на английском — ожидать. То есть мы сначала говорим countDown. Как говорит гугл переводчик, count down — "an act of counting numerals in reverse order to zero", то есть выполнить действие по обратному отсчёту, цель которого — досчитать до нуля. А дальше говорим await — то есть ожидать, пока значение счётчика не станет ноль. Интересно, что такой счётчик — одноразовый. Как сказано в JavaDoc — "When threads must repeatedly count down in this way, instead use a CyclicBarrier", то есть если нужен многоразовый счёт — надо использовать другой вариант, который называется CyclicBarrier.

CyclicBarrier

Как и следует из названия, CyclicBarrier — это циклический барьер. Нам понадобится импорт класса java.util.concurrent.CyclicBarrier. Посмотрим на пример:
public static void main(String[] args) throws InterruptedException {
	Runnable action = () -> System.out.println("На старт!");
	CyclicBarrier berrier = new CyclicBarrier(3, action);
	Runnable task = () -> {
		try {
			berrier.await();
			System.out.println("Finished");
		} catch (BrokenBarrierException | InterruptedException e) {
			e.printStackTrace();
		}
	};
	System.out.println("Limit: " + berrier.getParties());
	for (int i = 0; i < 3; i++) {
		new Thread(task).start();
	}
}
Как видим, поток выполняет await, то есть ожидает. При этом уменьшается значение барьера. Барьер считается сломанным (berrier.isBroken()), когда отсчёт дошёл до нуля. Чтобы сбросить барьер, нужно вызвать berrier.reset(), чего не хватало в CountDownLatch.

Exchanger

Следующее средство — Exchanger. Exchange с английского переводится как обмен или обмениваться. А Exchanger — обменник, то есть то, через что обмениваются. Посмотрим на простейший пример:
public static void main(String[] args) {
	Exchanger<String> exchanger = new Exchanger<>();
	Runnable task = () -> {
		try {
			Thread thread = Thread.currentThread();
			String withThreadName = exchanger.exchange(thread.getName());
			System.out.println(thread.getName() + " обменялся с " + withThreadName);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	new Thread(task).start();
	new Thread(task).start();
}
Тут мы запускаем два потока. Каждый из них выполняет метод exchange и ожидает, когда другой поток так жевыполнит метод exchange. Таким образом, потоки обменяются между собой переданными аргументами. Интересная штука. Ничего ли она вам не напоминает? А напоминает он SynchronousQueue, которая лежит в основе cachedThreadPool'а. Для наглядности — пример:
public static void main(String[] args) throws InterruptedException {
	SynchronousQueue<String> queue = new SynchronousQueue<>();
	Runnable task = () -> {
		try {
			System.out.println(queue.take());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	new Thread(task).start();
	queue.put("Message");
}
В примере видно, что запустив новый поток, данный поток уйдёт в ожидание, т.к. в очереди будет пусто. А дальше main поток положит в очередь текст "Message". При этом он сам остановится на время, которой нужно, пока не получат из очереди этот текстовый элемент. По этой теме так же можно почитать "SynchronousQueue Vs Exchanger".

Phaser

И напоследок самое сладкое — Phaser. Нам понадобится импорт класса java.util.concurrent.Phaser. Посмотрим на простой пример:
public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser();
        // Вызывая метод register, мы регистрируем текущий поток (main) как участника
        phaser.register();
        System.out.println("Phasecount is " + phaser.getPhase());
        testPhaser(phaser);
        testPhaser(phaser);
        testPhaser(phaser);
        // Через 3 секунды прибываем к барьеру и снимаемся регистрацию. Кол-во прибывших = кол-во регистраций = пуск
        Thread.sleep(3000);
        phaser.arriveAndDeregister();
        System.out.println("Phasecount is " + phaser.getPhase());
    }

    private static void testPhaser(final Phaser phaser) {
        // Говорим, что будет +1 участник на Phaser
        phaser.register();
        // Запускаем новый поток
        new Thread(() -> {
            String name = Thread.currentThread().getName();
            System.out.println(name + " arrived");
            phaser.arriveAndAwaitAdvance(); //threads register arrival to the phaser.
            System.out.println(name + " after passing barrier");
        }).start();
    }
Из примера видно, что барьер при использовании Phaser'а прорывается, когда количество регистраций совпадает с количеством прибывших к барьеру. Подробнее можно ознакомиться с Phaser'ом в статье с хабра "Новый синхронизатор Phaser".

Итоги

Как видно из примеров, существуют различные способы синхронизации потоков. Ранее я постарался уже вспомнить что-то из многопоточности, надеюсь прошлые части были полезны. Говорят, что путь к многопоточности начинается с книги "Java Concurrency in Practice". Хотя она вышла в 2006 году, люди отвечают, что книга довольно фундаментальна и до сих пор держит удар. Например, можно прочитать обсуждения здесь: "Is Java Concurrency In Practice still valid?". Также полезно прочитать ссылки из обсуждения. Например, там есть ссылка на книгу "The Well-Grounded Java Developer", в которой стоит обратить на "Chapter 4. Modern concurrency". Есть ещё целый обзор на эту же тему: "Is Java cocurrency in pracitce still relevant in era of java 8". Там также есть советы по поводу того, что ещё следует почитать, чтобы действительно понять эту тему. После этого, можно присмотреться к такой замечательной книге, как "OCA OCP JavaSE 8 Programmer Practice Tests". Нас интересует вторая часть, то есть OCP. И там есть тесты в "∫". В этой книге есть как вопросы, так и ответы со объяснением. Например: Thread'ом Java не испортишь: Часть VI — К барьеру! - 3Многие могут начать говорить, что это очередное заучивание методов. С одной стороны — да. С другой стороны, на этот вопрос можно дать ответ, вспомнив, что ExecutorService — это своего рода "апгрейд" Executor'а. А Executor призван просто скрыть способ создания потоков, но не основной способ их выполнения, то есть запуск в новом потоке Runnable. Поэтому execute(Callable) и нет, т.к. в ExecutorService к Executor'у просто добавили методы submit, которые умеют возвращать Future. Как видите, мы можем и заучить список методов, но куда проще догадаться, зная природу самих классов. Ну и немного дополнительных материалов по теме: #Viacheslav
Комментарии (30)
  • популярные
  • новые
  • старые
Для того, чтобы оставить комментарий Вы должны авторизоваться
Ксюша Пиляк Java Developer
6 июня 2023, 07:38
Благодарю автора за хороший цикл статей, где собрано практически все, что надо знать по многопоточности. Для мидлов самое то. Для новичков будет сложновато.
Viter
Уровень 38
12 апреля 2023, 08:45
Как доступно, подробно и понятно объяснили про Phaser, однако можно было и немного сократить без потери информативности например "есть Phaser - гуглите"
Роман
Уровень 41
7 апреля 2023, 13:56
Тяжело...)
Миша Зефир Android Developer
8 февраля 2023, 06:51
Респект и уважением всем выжившим! Реально сложно, мне не хватило более полного раскрытия некоторых тем, которые просто проскакивали. Параллельно читал МЕТАНИТ и смотрел видео на ютубе. Вроде дошло) осталось запомнить) Автору - огромнейшее спасибо!)
Евгений Осин
Уровень 41
22 мая 2022, 13:13
Не понял про CyclicBarrier. Как видим, поток выполняет await, то есть ожидает. При этом уменьшается значение барьера. Барьер считается сломанным (berrier.isBroken()), когда отсчёт дошёл до нуля. В следствие чего барьер уменьшается? То есть как я понял чтобы код отработал, минимум 3 разных нити должны запустить task? Или я не так понял?
Evgeny Siganov QA Automation Engineer в Айтеко
18 июня 2022, 10:02
как я понял счётчик в барьере уменьшается в следствии вызова в потоках метода await()
Сергей
Уровень 51
18 июня 2022, 12:22
Когда ВСЕ участники (потоки) вызовут await(), барьер сломается, выполнится action, и потоки дальше побегут. Таким образом все потоки подравнивают у барьера как лошадок.
Роман Кончалов
Уровень 28
Expert
26 апреля 2022, 20:12
Топовые статейки, но не для начинающих, конечно)
LuneFox System Administrator в BIFIT Expert
14 октября 2021, 11:40
«Тут мы запускаем два потока. Каждый из них выполняет метод exchange и ожидает, когда другой поток так жевыполнит метод exchange. Таким образом, потоки обменяются между собой переданными аргументами. Интересная штука. Ничего ли она вам не напоминает? А напоминает он SynchronousQueue» Нет, не напоминает, потому что мы про всё это впервые в жизни слышим!
Фёдор Бэггинс
Уровень 11
22 января 2022, 06:24
говорите за себя. Забейте в гугле. Даже тут на ДР есть неплохая статейка.
LuneFox System Administrator в BIFIT Expert
22 января 2022, 11:54
В программе обучения до этого момента не предлагали это погуглить или выучить, так что - не встречалось. Личный опыт читающих не учитывается, его нет в программе.
Фёдор Бэггинс
Уровень 11
29 мая 2022, 11:34
Добро пожаловать во взрослую жизнь, дальше только так и будет.
Николай Хазов
Уровень 23
22 июня 2021, 18:21
Статьи супер. CountDownLatch не отрабатывает. Поверить не могу, что его нужно синхронизировать.
Lt_Den
Уровень 34
10 октября 2021, 14:47
у меня, еще как отрабатывает. Интересно, что если в примере CountDownLatch строку "System.out.println("Finished");" заменить на "System.out.println("Finished - "+ Thread.currentThread().getName());", то можно увидеть состояние гонки. Когда некоторые нити будут быстрее финишировать и отписываться в консоль "Finished", чем другие об уменьшении счетчика.
Кирилл Java Developer
13 апреля 2021, 21:01
Автору респект! Материал хоть и чрезвычайно сложный и огромный и постоянно открывающиеся "стеки" ссылок в и так немаленьких статьях, приводят к негодованию. )) Но, так понимаю автор пытается охватить все основные аспекты темы многопоточности, всего в нескольких статьях. И гугл тут приходит нам на помощь. Да, тяжело и бесит. Но подача столь объёмного материала в одном месте, может сэкономить немало времени.
Davilalexius System Engineer
9 декабря 2020, 19:57
Countdown: 0/Finished/Countdown: 1/Finished/Countdown: 1/Finished Countdown: 1/Finished/Countdown: 0/Finished/Countdown: 1/Finished Countdown: 2/Finished/Countdown: 1/Finished/Countdown: 0/Finished А это как понимать?) 😂 Копи-паст кода, чистый мэйн в Intellije Idea CountDownLatch countDownLatch = new CountDownLatch(3); //не отрабатывает как должна))) Вот тебе раз ))) Смешно, но помогло только
synchronized (countDownLatch) {
                countDownLatch.countDown();
                    System.out.println("Countdown: " + countDownLatch.getCount());
                }
Пример из CyclicBarrier System.out.println(berrier.isBroken()); //Всегда возвращает false, где же его читать и стоит ли читать вообще?)