Вступ
Ми вже розглядали у
першій частині , як створюються потоки. Ще раз згадаємо.
Потік - це
Thread
, в ньому щось запускається
run
, тому скористаємося
tutorialspoint java online compiler'ом і виконаємо наступний код:
public class HelloWorld {
public static void main(String []args){
Runnable task = () -> {
System.out.println("Hello World");
};
new Thread(task).start();
}
}
Чи єдиний це варіант запуску завдання в потоці?
java.util.concurrent.Callable
Виявляється, у
java.lang.Runnable є брат і звуть його
java.util.concurrent.Callable і з'явився він на світ Java 1.5. У чому ж різниця? Якщо придивитися до JavaDoc цього інтерфейсу, ми бачимо, що на відміну від
Runnable
, новий інтерфейс оголошує метод
call
, який повертає результат. Крім того, за умовчанням він throws Exception. Тобто позбавляє нас необхідності на виключення, що перевіряються, писати
try-catch
блоки. Вже непогано, правда? Тепер у нас є замість
Runnable
нового task:
Callable task = () -> {
return "Hello, World!";
};
Але що з нею робити? Навіщо нам взагалі завдання, яке виконується в потоці, яке повертає результат? Очевидно, що надалі ми розраховуємо отримати результат дій, які в майбутньому будуть виконані. Майбутнє англійською — Future. І інтерфейс є з таким самим ім'ям:
java.util.concurrent.Future
java.util.concurrent.Future
Інтерфейс
java.util.concurrent.Future описує API для роботи із завданнями, результат яких ми плануємо отримати в майбутньому: методи отримання результату, методи перевірки статусу. Для
Future
нас цікавить його реалізація
java.util.concurrent.FutureTask . Тобто це
Task
, який буде виконано
Future
. Чим ця реалізація ще цікава, то це тим, що вона реалізує і
Runnable
. Можна вважати це свого роду адаптером старої моделі роботи із завданнями в потоках та нової моделі (нової в тому сенсі, що вона з'явилася в java 1.5). Ось приклад:
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class HelloWorld {
public static void main(String []args) throws Exception {
Callable task = () -> {
return "Hello, World!";
};
FutureTask<String> future = new FutureTask<>(task);
new Thread(future).start();
System.out.println(future.get());
}
}
Як видно з прикладу, ми отримуємо за допомогою методу
get
результат із завдання
task
.
(!) Важливо, що у момент отримання результату з допомогою методу
get
виконання стає синхронним. Як ви вважаєте, який механізм тут буде використаний? Правильно, немає блоку синхронізації - тому
WAITING в JVisualVM ми побачимо не як
monitor
або
wait
, а як цей
park
(т.к. використовується механізм
LockSupport
).
Функціональні інтерфейси
Далі йтиметься про класи з Java 1.8, тому не зайвим буде зробити короткий вступ. Подивимося на наступний код:
Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
return "String";
}
};
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
Function<String, Integer> converter = new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return Integer.valueOf(s);
}
};
Як же багато зайвого коду, чи не так? Кожен з класів, що оголошуються, виконує якусь одну функцію, але для її опису ми використовуємо купу зайвого допоміжного коду. І розробники Java так само подумали. Тому вони ввели набір "функціональних інтерфейсів" (
@FunctionalInterface
) і вирішабо, що тепер Java сама "додумуватиме" за нас все, крім важливого:
Supplier<String> supplier = () -> "String";
Consumer<String> consumer = s -> System.out.println(s);
Function<String, Integer> converter = s -> Integer.valueOf(s);
Supplier
- Постачальник. Він не має параметрів, але повертає щось, тобто постачає це.
Consumer
- Споживач. Він приймає на вхід щось (параметр s) і з цим щось робить, тобто споживає щось. Є ще функція. Вона приймає на вхід щось (параметр
s
), щось робить і щось повертає. Як бачимо, активно використовуються дженерики. У разі невпевненості можна згадати про них і прочитати "
Теорія дженериків у Java або як на практиці ставити дужки ".
CompletableFuture
Минав час, і в Java 1.8 з'явився новий клас, який зветься
CompletableFuture
. Він реалізує інтерфейс
Future
, тобто наші
task
будуть виконані в майбутньому, і ми зможемо виконати
get
та отримати результат. Але ще він реалізує деякий
CompletionStage
. З перекладу зрозуміло його призначення: це якийсь етап (Stage) якихось обчислень. З коротким введенням у тему можна ознайомитись у огляді "
Introduction to CompletionStage and CompletableFuture ". Давайте перейдемо одразу до справи. Подивімося на список доступних статичних методів, які нам допоможуть розпочати:
Ось варіанти їх використання:
import java.util.concurrent.CompletableFuture;
public class App {
public static void main(String []args) throws Exception {
CompletableFuture<String> completed;
completed = CompletableFuture.completedFuture("Просто значення");
CompletableFuture<Void> voidCompletableFuture;
voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println("run " + Thread.currentThread().getName());
});
CompletableFuture<String> supplier;
supplier = CompletableFuture.supplyAsync(() -> {
System.out.println("supply " + Thread.currentThread().getName());
return "Значение";
});
}
}
Якщо ми виконаємо цей код, то побачимо, що створення
CompletableFuture
передбачає запуск і всього ланцюжка. Тому при певній схожості зі SteamAPI з Java8 у цьому відмінність цих підходів. Наприклад:
List<String> array = Arrays.asList("one", "two");
Stream<String> stringStream = array.stream().map(value -> {
System.out.println("Executed");
return value.toUpperCase();
});
Це приклад Java 8 Stream Api (докладніше можна з ним ознайомитися тут "
Посібник з Java 8 Stream API в картинках і прикладах "). Якщо запустити цей код,
Executed
не з'явиться. Тобто при створенні стриму Java стрім не запускається відразу, а чекає, коли з нього захочуть значення. А ось
CompletableFuture
запускає ланцюжок на виконання одразу, не чекаючи на те, що в нього попросять пораховане значення. Вважаю важливим це розуміти. Отже, ми маємо CompletableFuture. Як же ми можемо скласти ланцюжок і які ми маємо кошти? Згадаймо про функціональні інтерфейси, про які ми писали раніше.
- Ми маємо функцію (
Function
), яка приймає А і повертає Б. Має єдиний метод — apply
(застосувати).
- У нас є споживач (
Consumer
), який приймає А і нічого не повертає (Void). Має єдиний метод – accept
(прийняти).
- У нас є код, що запускається в потоці
Runnable
, який не приймає і не повертає. Має єдиний метод run
(запустити).
Друге, що треба пам'ятати, що
CompletalbeFuture
у своїй роботі використовує
Runnable
, споживачів та функції. Враховуючи це, ви завжди зможете згадати, що
CompletableFuture
можна робити так:
public static void main(String []args) throws Exception {
AtomicLong longValue = new AtomicLong(0);
Runnable task = () -> longValue.set(new Date().getTime());
Function<Long, Date> dateConverter = (longvalue) -> new Date(longvalue);
Consumer<Date> printer = date -> {
System.out.println(date);
System.out.flush();
};
CompletableFuture.runAsync(task)
.thenApply((v) -> longValue.get())
.thenApply(dateConverter)
.thenAccept(printer);
}
У методів
thenRun
,
thenApply
і
thenAccept
є версії
Async
. Це означає, що ці стадії будуть виконані у новому потоці. Його буде взято з особливого пулу, тому заздалегідь невідомо, який потік буде, новий чи колишній. Все залежить від того, наскільки важкі завдання. Крім цих методів, є ще три цікаві можливості. Для наочності уявімо, що у нас є якийсь сервіс, який отримує якесь повідомлення звідкись і на це потрібен час:
public static class NewsService {
public static String getMessage() {
try {
Thread.currentThread().sleep(3000);
return "Message";
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
Тепер, давайте подивимося на інші можливості, які надає
CompletableFuture
. Ми можемо поєднувати результат
CompletableFuture
з результатом іншого
CompletableFuture
:
Supplier newsSupplier = () -> NewsService.getMessage();
CompletableFuture<String> reader = CompletableFuture.supplyAsync(newsSupplier);
CompletableFuture.completedFuture("!!")
.thenCombine(reader, (a, b) -> b + a)
.thenAccept(result -> System.out.println(result))
.get();
Тут варто звернути увагу, що за замовчуванням потоки будуть демон-потоками, тому для наочності ми використовуємо
get
для того, щоб дочекатися результату. А ще ми можемо не тільки об'єднати (combine), але й повертати
CompletableFuture
:
CompletableFuture.completedFuture(2L)
.thenCompose((val) -> CompletableFuture.completedFuture(val + 2))
.thenAccept(result -> System.out.println(result));
Тут хочеться відзначити, що для стислості використаний метод
CompletableFuture.completedFuture
. Даний метод не створює новий потік, тому решта ланцюжка буде виконана в тому ж потоці, в якому був викликаний
completedFuture
. Також є метод
thenAcceptBoth
. Він дуже схожий на
accept
, але якщо
thenAccept
приймає
consumer
, то
thenAcceptBoth
приймає на вхід ще один
CompletableStage
+
BiConsumer
, тобто
consumer
який на вхід приймає 2 джерела, а не один. Є ще цікава можливість зі словом
Either
:
Дані методи приймають альтернативний
CompletableStage
і будуть виконані на тому
CompletableStage
, який першим виконається. І закінчити цей огляд хочеться ще однією цікавою можливістю
CompletableFuture
обробкою помилок.
CompletableFuture.completedFuture(2L)
.thenApply((a) -> {
throw new IllegalStateException("error");
}).thenApply((a) -> 3L)
.thenAccept(val -> System.out.println(val));
Цей код нічого не зробить, т.к. впаде виняток і нічого не буде. Але якщо ми розкоментуємо
exceptionally
, то визначимо поведінку. На тему
CompletableFuture
раджу також подивитись наступне відео:
На мою скромну думку, дані відео – одні з найнаочніших на просторах інтернету. З них має бути зрозумілим, як це все працює, який у нас є арсенал і навіщо це все потрібно.
Висновок
Сподіваюся тепер стало зрозуміло, як можна використовувати потоки для отримання обчислень після того, як вони будуть обчислені. Додатковий матеріал:
#Viacheslav
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ