Вступ

Потоки — штука цікава. У минулих оглядах ми розглянули деякі доступні засоби для реалізації багатопоточності.

Частина I — потоки. Багатопотоковість у Java закладється з перших днів її існування. Давай для початку згадаємо, звідки беруться потоки, як вони загалом організовані.

Частина 2 — синхронізация. Присвячена базовим засобам синхронізації між потоками. Розглянемо, що таке монітор, лок, синхронізація.

Частина 3 — взаємодія. Огляд особливостей взаємодії потоків. Розглянемо, які проблеми можуть з'явитися при взаємодії потоків і поговоримо про те, як їх можна уникнути.

Частина 4 — Callable, Future і друзі. Ця частина розповість, як працювати з обчисленнями у потоках і які засоби для цього з'явились у Java 1.8. Навіщо потрібен інтерфейс Future і його реалізація CompletableFuture.

Частина 5 — Executor, ThreadPool, Fork Join. Тут розглянемо Executor'и, пули потоків і Fork Join Framework. Ти дізнаєшся, як цим усім користуватисяі що ще додатково почитати.

Частина 6 — до бар'єру! У фінальній частині розглянемо доступні в Java синхронізатори і підведемо підсумки.

Давайте подивимося, що ми можемо зробити ще цікавого. На цей момент ми багато чого знаємо. Щоб нагадати, можемо скористатися Tutorialspoint Java Online Compiler'ом:

public static void main(String []args){
	Runnable task = () -> {
 		Thread thread = Thread.currentThread();
		System.out.println("Привіт від " + thread.getName());
	};
	Thread thread = new Thread(task);
	thread.start();
}
Також ми знаємо про те, що у нас є таке поняття, як лок. Про нього ми читали у "Thread'ом Java не зіпсуєш: Частина II — синхронізація". Потік може займати лок і тоді інший потік, який спробує зайняти лок, буде змушений чекати звільнення лока:

import java.util.concurrent.locks.*;

public class HelloWorld{
	public static void main(String []args){
		Lock lock = new ReentrantLock();
		Runnable task = () -> {
			lock.lock();
			Thread thread = Thread.currentThread();
			System.out.println("Привіт від " + thread.getName());
			lock.unlock();
		};
		Thread thread = new Thread(task);
		thread.start();
	}
}
Думаю, час поговорити про те, що ми ще можемо зробити цікавого.

Семафори

Найпростіший засіб контролю за тим, скільки потоків можуть одночасно працювати — семафор. Як на залізниці. Горить зелений — можна. Горить червоний — чекаємо. Що ми чекаємо від семафора? Дозволу. Дозвіл англійською — permit. Щоб отримати дозвіл — його потрібно отримати, що англійською буде acquire. А коли дозвіл більше не потрібен, ми його повинні віддати, тобто звільнити його або позбутися його, що англійською буде release. Подивимося, як це працює. Нам знадобиться імпорт класу java.util.concurrent.Semaphore. Приклад:

public static void main(String[] args) throws InterruptedException {
	Semaphore semaphore = new Semaphore(0);
	Runnable task = () -> {
		try {
			semaphore.acquire();
			System.out.println("Завершено");
			semaphore.release();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	new Thread(task).start();
	Thread.sleep(5000);
	semaphore.release(1);
}
Як бачимо, запам'ятавши англійські слова, ми розуміємо, як працює семафор. Цікаво, що головна умова — на "рахунку" семафора має бути позитивна кількість permit'ів. Тому ініціювати його можна і з мінусом. І запитувати (acquire) можна більше, ніж 1.

CountDownLatch

Наступний механізм — CountDownLatch. CountDown англійською — це відлік, а Latch — засувка або защіпка. Тобто, якщо перекладати, то це защіпка з відліком. Тут нам знадобиться відповідний імпорт класу java.util.concurrent.CountDownLatch. Це схоже на біг або гонки, коли всі збираються на стартовій лінії і коли всі готові — дають дозвіл, і всі одночасно стартують. Приклад:

public static void main(String[] args) {
	CountDownLatch countDownLatch = new CountDownLatch(3);
	Runnable task = () -> {
		try {
			countDownLatch.countDown();
			System.out.println("Countdown: " + countDownLatch.getCount());
			countDownLatch.await();
			System.out.println("Завершено");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	for (int i = 0; i < 3; i++) {
		new Thread(task).start();
 	}
}
await англійською — очікувати. Тобто ми спочатку кажемо countDown. Як каже гугл перекладач, count down — "an act of counting numerals in reverse order to zero", тобто виконати дію зворотного відліку, ціль якого — дорахувати до нуля. А далі кажемо await — тобто очікувати, поки значення лічильника не стане нуль. Цікаво, що такий лічильник — одноразовий. Як сказано в JavaDoc — "When threads must repeatedly count down in this way, instead use a CyclicBarrier", тобто, якщо потрібен багаторазовий рахунок — треба використовувати інший варіант, який називається CyclicBarrier.

CyclicBarrier

Як і слід з назви, CyclicBarrier — це циклічний бар'єр. Нам знадобиться імпорт класу java.util.concurrent.CyclicBarrier. Подивимося на приклад:

public static void main(String[] args) throws InterruptedException {
	Runnable action = () -> System.out.println("На старт!");
	CyclicBarrier berrier = new CyclicBarrier(3, action);
	Runnable task = () -> {
		try {
			berrier.await();
			System.out.println("Завершено");
		} catch (BrokenBarrierException | InterruptedException e) {
			e.printStackTrace();
		}
	};
	System.out.println("Ліміт: " + berrier.getParties());
	for (int i = 0; i < 3; i++) {
		new Thread(task).start();
	}
}
Як бачимо, потік виконує await, тобто очікує. При цьому зменшується значення бар'єра. Бар'єр вважається зламаним (berrier.isBroken()), коли відлік дійшов до нуля. Щоб скинути бар'єр, потрібно викликати berrier.reset(), чого не вистачало в CountDownLatch.

Exchanger

Наступний засіб — Exchanger. Exchange з англійської перекладається як обмін або обмінюватися. А Exchanger — обмінник, тобто те, через що обмінюються. Подивимося на найпростіший приклад:

public static void main(String[] args) {
	Exchanger<String> exchanger = new Exchanger<>();
	Runnable task = () -> {
		try {
			Thread thread = Thread.currentThread();
			String withThreadName = exchanger.exchange(thread.getName());
			System.out.println(thread.getName() + " обмінявся з " + withThreadName);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	new Thread(task).start();
	new Thread(task).start();
}
Тут ми запускаємо два потоки. Кожен з них виконує метод exchange і очікує, коли інший потік також виконає метод exchange. Таким чином, потоки обміняються між собою переданими аргументами. Цікава штука. Нічого вам не нагадує? А нагадує вона SynchronousQueue, яка лежить в основі cachedThreadPool'а. Для наочності — приклад:

public static void main(String[] args) throws InterruptedException {
	SynchronousQueue<String> queue = new SynchronousQueue<>();
	Runnable task = () -> {
		try {
			System.out.println(queue.take());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	};
	new Thread(task).start();
	queue.put("Повідомлення");
}
У прикладі видно, що запустивши новий потік, даний потік піде в очікування, бо в черзі буде порожньо. А далі main потік покладе в чергу текст "Повідомлення". При цьому він сам зупиниться на час, поки не отримають із черги цей текстовий елемент. На цю тему також можна почитати "SynchronousQueue Vs Exchanger".

Phaser

І наостанок найсолодше — Phaser. Нам знадобиться імпорт класу java.util.concurrent.Phaser. Подивимося на простий приклад:

public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser();
        // Викликаючи метод register, ми реєструємо потік (main) як учасника
        phaser.register();
        System.out.println("Phasecount is " + phaser.getPhase());
        testPhaser(phaser);
        testPhaser(phaser);
        testPhaser(phaser);
        // Через 3 секунди доходимо до бар'єра і знімаємо реєстрацію. Кількість прибулих = кількість реєстрацій = старт
        Thread.sleep(3000);
        phaser.arriveAndDeregister();
        System.out.println("Phasecount is " + phaser.getPhase());
    }

    private static void testPhaser(final Phaser phaser) {
        // Кажемо, що буде +1 учасник на Phaser
        phaser.register();
        // Запускаємо новий потік
        new Thread(() -> {
            String name = Thread.currentThread().getName();
            System.out.println(name + " arrived");
            phaser.arriveAndAwaitAdvance(); //threads register arrival to the phaser.
            System.out.println(name + " after passing barrier");
        }).start();
    }
З прикладу видно, що бар'єр при використанні Phaser'а проривається, коли кількість реєстрацій збігається з кількістю прибулих до бар'єра. Детальніше можна ознайомитись із Phaser'ом у статті на хабрі "Новий синхронізатор Phaser".

Підсумки

Як видно з прикладів, існують різні способи синхронізації потоків. Раніше я спробував вже згадати щось із багатопоточності, сподіваюсь попередні частини були корисними. Кажуть, що шлях до багатопоточності починається з книги "Java Concurrency in Practice". Хоча вона вийшла у 2006 році, люди відповідають, що книга досить фундаментальна і досі актуальна. Наприклад, можна прочитати обговорення тут: "Is Java Concurrency In Practice still valid?". Також корисно прочитати посилання з обговорення. Наприклад, там є посилання на книгу "The Well-Grounded Java Developer", в якій варто звернути увагу на "Chapter 4. Modern concurrency". Є ще цілий огляд на цю ж тему: "Is Java concurrency in practice still relevant in era of java 8". Там також є поради щодо того, що ще слід прочитати, щоб дійсно зрозуміти цю тему. Після цього, можна придивитися до такої чудової книги, як "OCA OCP JavaSE 8 Programmer Practice Tests". Нас цікавить друга частина, тобто OCP. І там є тести в "∫". У цій книзі є як питання, так і відповіді з поясненням. Наприклад: Thread'ом Java не зіпсуєш: Частина VI — До бар'єра! - 3Багато хто може почати говорити, що це чергове завчання методів. З одного боку — так. З іншого боку, на це питання можна дати відповідь, згадавши, що ExecutorService — це своєрідний "апгрейд" Executor'а. А Executor покликаний просто приховати спосіб створення потоків, але не основний спосіб їх виконання, тобто запуск у новому потоці Runnable. Тому execute(Callable) і немає, тому що в ExecutorService до Executor'а просто додали методи submit, які вміють повертати Future. Як бачите, ми можемо і завчити список методів, але значно простіше здогадатися, знаючи природу самих класів. Ну і трохи додаткових матеріалів по темі: