Вступ
Потоки — штука цікава. У минулих оглядах ми розглянули деякі доступні засоби для реалізації багатопоточності.Частина I — потоки. Багатопотоковість у Java закладється з перших днів її існування. Давай для початку згадаємо, звідки беруться потоки, як вони загалом організовані.
Частина 2 — синхронізация. Присвячена базовим засобам синхронізації між потоками. Розглянемо, що таке монітор, лок, синхронізація.
Частина 3 — взаємодія. Огляд особливостей взаємодії потоків. Розглянемо, які проблеми можуть з'явитися при взаємодії потоків і поговоримо про те, як їх можна уникнути.
Частина 4 — Callable, Future і друзі. Ця частина розповість, як працювати з обчисленнями у потоках і які засоби для цього з'явились у Java 1.8. Навіщо потрібен інтерфейс Future і його реалізація CompletableFuture.
Частина 5 — Executor, ThreadPool, Fork Join. Тут розглянемо Executor'и, пули потоків і Fork Join Framework. Ти дізнаєшся, як цим усім користуватисяі що ще додатково почитати.
Частина 6 — до бар'єру! У фінальній частині розглянемо доступні в Java синхронізатори і підведемо підсумки.
Давайте подивимося, що ми можемо зробити ще цікавого. На цей момент ми багато чого знаємо. Щоб нагадати, можемо скористатися Tutorialspoint Java Online Compiler'ом:
public static void main(String []args){
Runnable task = () -> {
Thread thread = Thread.currentThread();
System.out.println("Привіт від " + thread.getName());
};
Thread thread = new Thread(task);
thread.start();
}
Також ми знаємо про те, що у нас є таке поняття, як лок. Про нього ми читали у "Thread'ом Java не зіпсуєш: Частина II — синхронізація". Потік може займати лок і тоді інший потік, який спробує зайняти лок, буде змушений чекати звільнення лока:
import java.util.concurrent.locks.*;
public class HelloWorld{
public static void main(String []args){
Lock lock = new ReentrantLock();
Runnable task = () -> {
lock.lock();
Thread thread = Thread.currentThread();
System.out.println("Привіт від " + thread.getName());
lock.unlock();
};
Thread thread = new Thread(task);
thread.start();
}
}
Думаю, час поговорити про те, що ми ще можемо зробити цікавого.
Семафори
Найпростіший засіб контролю за тим, скільки потоків можуть одночасно працювати — семафор. Як на залізниці. Горить зелений — можна. Горить червоний — чекаємо. Що ми чекаємо від семафора? Дозволу. Дозвіл англійською — permit. Щоб отримати дозвіл — його потрібно отримати, що англійською буде acquire. А коли дозвіл більше не потрібен, ми його повинні віддати, тобто звільнити його або позбутися його, що англійською буде release. Подивимося, як це працює. Нам знадобиться імпорт класуjava.util.concurrent.Semaphore.
Приклад:
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(0);
Runnable task = () -> {
try {
semaphore.acquire();
System.out.println("Завершено");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
new Thread(task).start();
Thread.sleep(5000);
semaphore.release(1);
}
Як бачимо, запам'ятавши англійські слова, ми розуміємо, як працює семафор. Цікаво, що головна умова — на "рахунку" семафора має бути позитивна кількість permit'ів. Тому ініціювати його можна і з мінусом. І запитувати (acquire) можна більше, ніж 1.
CountDownLatch
Наступний механізм —CountDownLatch. CountDown англійською — це відлік, а Latch — засувка або защіпка. Тобто, якщо перекладати, то це защіпка з відліком.
Тут нам знадобиться відповідний імпорт класу java.util.concurrent.CountDownLatch.
Це схоже на біг або гонки, коли всі збираються на стартовій лінії і коли всі готові — дають дозвіл, і всі одночасно стартують. Приклад:
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3);
Runnable task = () -> {
try {
countDownLatch.countDown();
System.out.println("Countdown: " + countDownLatch.getCount());
countDownLatch.await();
System.out.println("Завершено");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 3; i++) {
new Thread(task).start();
}
}
await англійською — очікувати. Тобто ми спочатку кажемо countDown. Як каже гугл перекладач, count down — "an act of counting numerals in reverse order to zero", тобто виконати дію зворотного відліку, ціль якого — дорахувати до нуля. А далі кажемо await — тобто очікувати, поки значення лічильника не стане нуль.
Цікаво, що такий лічильник — одноразовий. Як сказано в JavaDoc — "When threads must repeatedly count down in this way, instead use a CyclicBarrier", тобто, якщо потрібен багаторазовий рахунок — треба використовувати інший варіант, який називається CyclicBarrier.
CyclicBarrier
Як і слід з назви,CyclicBarrier — це циклічний бар'єр.
Нам знадобиться імпорт класу java.util.concurrent.CyclicBarrier.
Подивимося на приклад:
public static void main(String[] args) throws InterruptedException {
Runnable action = () -> System.out.println("На старт!");
CyclicBarrier berrier = new CyclicBarrier(3, action);
Runnable task = () -> {
try {
berrier.await();
System.out.println("Завершено");
} catch (BrokenBarrierException | InterruptedException e) {
e.printStackTrace();
}
};
System.out.println("Ліміт: " + berrier.getParties());
for (int i = 0; i < 3; i++) {
new Thread(task).start();
}
}
Як бачимо, потік виконує await, тобто очікує. При цьому зменшується значення бар'єра. Бар'єр вважається зламаним (berrier.isBroken()), коли відлік дійшов до нуля.
Щоб скинути бар'єр, потрібно викликати berrier.reset(), чого не вистачало в CountDownLatch.
Exchanger
Наступний засіб —Exchanger. Exchange з англійської перекладається як обмін або обмінюватися. А Exchanger — обмінник, тобто те, через що обмінюються. Подивимося на найпростіший приклад:
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Runnable task = () -> {
try {
Thread thread = Thread.currentThread();
String withThreadName = exchanger.exchange(thread.getName());
System.out.println(thread.getName() + " обмінявся з " + withThreadName);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
new Thread(task).start();
new Thread(task).start();
}
Тут ми запускаємо два потоки. Кожен з них виконує метод exchange і очікує, коли інший потік також виконає метод exchange. Таким чином, потоки обміняються між собою переданими аргументами.
Цікава штука. Нічого вам не нагадує?
А нагадує вона SynchronousQueue, яка лежить в основі cachedThreadPool'а.
Для наочності — приклад:
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<String> queue = new SynchronousQueue<>();
Runnable task = () -> {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
};
new Thread(task).start();
queue.put("Повідомлення");
}
У прикладі видно, що запустивши новий потік, даний потік піде в очікування, бо в черзі буде порожньо. А далі main потік покладе в чергу текст "Повідомлення". При цьому він сам зупиниться на час, поки не отримають із черги цей текстовий елемент.
На цю тему також можна почитати "SynchronousQueue Vs Exchanger".
Phaser
І наостанок найсолодше —Phaser.
Нам знадобиться імпорт класу java.util.concurrent.Phaser. Подивимося на простий приклад:
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser();
// Викликаючи метод register, ми реєструємо потік (main) як учасника
phaser.register();
System.out.println("Phasecount is " + phaser.getPhase());
testPhaser(phaser);
testPhaser(phaser);
testPhaser(phaser);
// Через 3 секунди доходимо до бар'єра і знімаємо реєстрацію. Кількість прибулих = кількість реєстрацій = старт
Thread.sleep(3000);
phaser.arriveAndDeregister();
System.out.println("Phasecount is " + phaser.getPhase());
}
private static void testPhaser(final Phaser phaser) {
// Кажемо, що буде +1 учасник на Phaser
phaser.register();
// Запускаємо новий потік
new Thread(() -> {
String name = Thread.currentThread().getName();
System.out.println(name + " arrived");
phaser.arriveAndAwaitAdvance(); //threads register arrival to the phaser.
System.out.println(name + " after passing barrier");
}).start();
}
З прикладу видно, що бар'єр при використанні Phaser'а проривається, коли кількість реєстрацій збігається з кількістю прибулих до бар'єра.
Детальніше можна ознайомитись із Phaser'ом у статті на хабрі "Новий синхронізатор Phaser".
Підсумки
Як видно з прикладів, існують різні способи синхронізації потоків. Раніше я спробував вже згадати щось із багатопоточності, сподіваюсь попередні частини були корисними. Кажуть, що шлях до багатопоточності починається з книги "Java Concurrency in Practice". Хоча вона вийшла у 2006 році, люди відповідають, що книга досить фундаментальна і досі актуальна. Наприклад, можна прочитати обговорення тут: "Is Java Concurrency In Practice still valid?". Також корисно прочитати посилання з обговорення. Наприклад, там є посилання на книгу "The Well-Grounded Java Developer", в якій варто звернути увагу на "Chapter 4. Modern concurrency". Є ще цілий огляд на цю ж тему: "Is Java concurrency in practice still relevant in era of java 8". Там також є поради щодо того, що ще слід прочитати, щоб дійсно зрозуміти цю тему. Після цього, можна придивитися до такої чудової книги, як "OCA OCP JavaSE 8 Programmer Practice Tests". Нас цікавить друга частина, тобто OCP. І там є тести в "∫". У цій книзі є як питання, так і відповіді з поясненням. Наприклад:
Багато хто може почати говорити, що це чергове завчання методів. З одного боку — так. З іншого боку, на це питання можна дати відповідь, згадавши, що ExecutorService — це своєрідний "апгрейд" Executor'а. А Executor покликаний просто приховати спосіб створення потоків, але не основний спосіб їх виконання, тобто запуск у новому потоці Runnable. Тому execute(Callable) і немає, тому що в ExecutorService до Executor'а просто додали методи submit, які вміють повертати Future. Як бачите, ми можемо і завчити список методів, але значно простіше здогадатися, знаючи природу самих класів.
Ну і трохи додаткових матеріалів по темі:
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ