JavaRush /Java блог /Random UA /Thread'ом Java не зіпсуєш: Частина VI - До бар'єру!
Viacheslav
3 рівень

Thread'ом Java не зіпсуєш: Частина VI - До бар'єру!

Стаття з групи Random UA

Вступ

Потоки – штука цікава. У попередніх оглядах ми розглянули деякі доступні засоби реалізації багатопоточності. Погляньмо, що ми можемо зробити ще цікавого. На цей момент ми багато що знаємо. Наприклад, з " Thread'ом Java не зіпсуєш: Частина I - потоки " ми знаємо, що потік - це Thread. Ми знаємо, що потік виконує певне завдання. Якщо хочемо, щоб наше завдання могли запустити ( run), ми повинні вказати потоку якийсь Runnable. Thread'ом Java не зіпсуєш: Частина VI - До бар'єру!  - 1Щоб згадати, можемо скористатися Tutorialspoint Java Online Compiler 'ом:
public static void main(String []args){
	Runnable task = () -> {
 		Thread thread = Thread.currentThread();
		System.out.println("Hello from " + thread.getName());
	};
	Thread thread = new Thread(task);
	thread.start();
}
Також ми знаємо про те, що у нас є таке поняття, як лок. Про нього ми читали в " Thread'ом Java не зіпсуєш: Частина II - синхронізація ". Потік може займати лок і тоді інший потік, який спробує зайняти лок, буде змушений чекати на звільнення локу:
import java.util.concurrent.locks.*;

public class HelloWorld{
	public static void main(String []args){
		Lock lock = new ReentrantLock();
		Runnable task = () -> {
			lock.lock();
			Thread thread = Thread.currentThread();
			System.out.println("Hello from " + thread.getName());
			lock.unlock();
		};
		Thread thread = new Thread(task);
		thread.start();
	}
}
Думаю, настав час поговорити про те, що ми ще можемо цікаве зробити.

Семафори

Найпростіший засіб контролю над тим, скільки потоків можуть одночасно працювати — семафор. Як на залізниці. Горить зелений – можна. Горить червоний – чекаємо. Що ми чекаємо на семафор? Дозволи. Дозвіл англійською - permit. Щоб отримати дозвіл - його потрібно отримати, що англійською буде acquire. А коли дозвіл більше не потрібно ми його повинні віддати, тобто звільнити його або позбудеться його, що англійською буде release. Подивимося, як це працює. Нам буде потрібно імпорт класу java.util.concurrent.Semaphore. Приклад:
public static void main(String[] args) throws InterruptedException {
	Semaphore semaphore = new Semaphore(0);
	Runnable task = () -> {
		try {
			semaphore.acquire();
			System.out.println("Finished");
			semaphore.release();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	new Thread(task).start();
	Thread.sleep(5000);
	semaphore.release(1);
}
Як бачимо, запам'ятавши англійські слова, ми розуміємо як працює семафор. Цікаво, що головна умова - на "рахунку" семафору має бути позитивна кількість permit'ів. Тому ініціювати його можна і з мінусом. І вимагати (acquire) можна більше, ніж 1.

CountDownLatch

Наступний механізм - CountDownLatch. CountDown англійською – це відлік, а Latch – засувка або клямка. Тобто якщо перекладати, це засувка з відліком. Тут нам знадобиться відповідний імпорт класу java.util.concurrent.CountDownLatch. Це схоже на біги чи гонки, коли всі збираються біля стартової лінії і коли всі готові – дають дозвіл, і всі одночасно стартують. Приклад:
public static void main(String[] args) {
	CountDownLatch countDownLatch = new CountDownLatch(3);
	Runnable task = () -> {
		try {
			countDownLatch.countDown();
			System.out.println("Countdown: " + countDownLatch.getCount());
			countDownLatch.await();
			System.out.println("Finished");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	for (int i = 0; i < 3; i++) {
		new Thread(task).start();
 	}
}
await англійською - очікувати. Тобто ми спочатку говоримо countDown. Як каже ґуґл перекладач, count down - "an act of counting numerals in reverse order to zero", тобто виконати дію за зворотним відліком, мета якого - дорахувати до нуля. А далі говоримо await— тобто чекати, доки значення лічильника не стане нульовим. Цікаво, що такий лічильник є одноразовим. Як сказано в JavaDoc - "When threads must repeatedly count down in this way, instead use a CyclicBarrier", тобто якщо потрібен багаторазовий рахунок - треба використовувати інший варіант, який називається CyclicBarrier.

CyclicBarrier

Як і випливає з назви, CyclicBarrierце циклічний бар'єр. Нам знадобиться імпорт класу java.util.concurrent.CyclicBarrier. Подивимося на приклад:
public static void main(String[] args) throws InterruptedException {
	Runnable action = () -> System.out.println("На старт!");
	CyclicBarrier berrier = new CyclicBarrier(3, action);
	Runnable task = () -> {
		try {
			berrier.await();
			System.out.println("Finished");
		} catch (BrokenBarrierException | InterruptedException e) {
			e.printStackTrace();
		}
	};
	System.out.println("Limit: " + berrier.getParties());
	for (int i = 0; i < 3; i++) {
		new Thread(task).start();
	}
}
Як бачимо, потік виконує await, тобто чекає. У цьому зменшується значення бар'єру. Бар'єр вважається зламаним ( berrier.isBroken()), коли відлік дійшов до нуля. Щоб скинути бар'єр, потрібно викликати berrier.reset(), чого не вистачало CountDownLatch.

Exchanger

Наступний засіб - Exchanger. Exchange з англійської перекладається як обмін або обмінюватися. А Exchangerобмінник, тобто те, через що обмінюються. Подивимося на найпростіший приклад:
public static void main(String[] args) {
	Exchanger<String> exchanger = new Exchanger<>();
	Runnable task = () -> {
		try {
			Thread thread = Thread.currentThread();
			String withThreadName = exchanger.exchange(thread.getName());
			System.out.println(thread.getName() + " обменялся с " + withThreadName);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	new Thread(task).start();
	new Thread(task).start();
}
Тут ми запускаємо два потоки. Кожен з них виконує метод exchange і чекає, коли інший потік так само виконає метод exchange. Отже, потоки обміняються між собою переданими аргументами. Цікава річ. Чи вона вам нічого не нагадує? А нагадує він SynchronousQueue, яка лежить в основі cachedThreadPool'а. Для наочності приклад:
public static void main(String[] args) throws InterruptedException {
	SynchronousQueue<String> queue = new SynchronousQueue<>();
	Runnable task = () -> {
		try {
			System.out.println(queue.take());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	new Thread(task).start();
	queue.put("Message");
}
У прикладі видно, що запустивши новий потік, цей потік піде в очікування, т.к. у черзі буде порожньо. А далі mainпотік покладе на чергу текст "Message". При цьому він сам зупиниться на час, який потрібно, доки не отримають із черги цей текстовий елемент. З цієї теми також можна почитати " SynchronousQueue Vs Exchanger ".

Phaser

І насамкінець найсолодше — Phaser. Нам знадобиться імпорт класу java.util.concurrent.Phaser. Подивимося на простий приклад:
public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser();
        // Вызывая метод register, мы регистрируем текущий поток (main) як участника
        phaser.register();
        System.out.println("Phasecount is " + phaser.getPhase());
        testPhaser(phaser);
        testPhaser(phaser);
        testPhaser(phaser);
        // Через 3 секунды прибываем к барьеру и снимаемся регистрацию. Кол-во прибывших = кол-во регистраций = пуск
        Thread.sleep(3000);
        phaser.arriveAndDeregister();
        System.out.println("Phasecount is " + phaser.getPhase());
    }

    private static void testPhaser(final Phaser phaser) {
        // Говорим, что будет +1 участник на Phaser
        phaser.register();
        // Запускаем новый поток
        new Thread(() -> {
            String name = Thread.currentThread().getName();
            System.out.println(name + " arrived");
            phaser.arriveAndAwaitAdvance(); //threads register arrival to the phaser.
            System.out.println(name + " after passing barrier");
        }).start();
    }
З прикладу видно, що бар'єр при використанні Phaserпроривається, коли кількість реєстрацій збігається з кількістю прибулих до бар'єру. Детальніше можна ознайомитися з Phaser'ом у статті з хабра " Новий синхронізатор Phaser ".

Підсумки

Як очевидно з прикладів, існують різні способи синхронізації потоків. Раніше я постарався вже згадати щось із багатопоточності, сподіваюся, минулі частини були корисні. Кажуть, що шлях до багатопоточності починається з книги Java Concurrency in Practice. Хоча вона вийшла у 2006 році, люди відповідають, що книга є досить фундаментальною і досі тримає удар. Наприклад, можна прочитати обговорення тут: " Is Java Concurrency In Practice still valid? ". Також корисно прочитати посилання із обговорення. Наприклад, там є посилання на книгу " The Well-Grounded Java Developer ", в якій варто звернутися на " Chapter 4. Modern concurrency ". Є ще цілий огляд на цю тему: "Там також є поради з приводу того, що ще слід почитати, щоб дійсно зрозуміти цю тему. Після цього, можна придивитися до такої чудової книги, як OCA OCP JavaSE 8 Programmer Practice Tests . І там є тести в "∫". У цій книзі є як питання, так і відповіді з поясненням. Наприклад: Thread'ом Java не зіпсуєш: Частина VI - До бар'єру!  - 3Багато хто може почати говорити, що це чергове заучування методів. З одного боку - так. З іншого боку, на це питання можна дати відповідь, згадавши, що ExecutorService- це свого роду "апгрейд" Executor'а. А Executorпокликаний просто приховати спосіб створення потоків, але не основний спосіб їх виконання, тобто запуск Runnableу execute(Callable)новому ExecutorServiceпотоці Executor.у просто додали методиsubmit, які вміють повертати Future. Як бачите, ми можемо і завчити список методів, але набагато простіше здогадатися, знаючи природу самих класів. Ну і трохи додаткових матеріалів на тему: #Viacheslav
Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ