Вступ
Ми вже розглядали у першій частині, як створюються потоки. Ще раз згадаємо.
Потік — це
Thread
, у ньому щось запускається
run
, тому скористаємося
tutorialspoint java online compiler'ом і виконаємо наступний код:
public class HelloWorld {
public static void main(String []args){
Runnable task = () -> {
System.out.println("Hello World");
};
new Thread(task).start();
}
}
Єдиний варіант запуску задачі у потоці?
java.util.concurrent.Callable
Виявляється, у
java.lang.Runnable є брат, і звати його
java.util.concurrent.Callable, і з'явився він у Java 1.5. У чому ж різниця? Якщо придивитися до JavaDoc цього інтерфейсу, ми бачимо, що на відміну від
Runnable
, новий інтерфейс оголошує метод
call
, який повертає результат. Крім того, за замовчуванням він throws Exception. Тобто звільняє нас від необхідності для перевірених винятків писати
try-catch
блоки. Уже непогано, правда?
Тепер у нас є замість
Runnable
новий task:
Callable task = () -> {
return "Hello, World!";
};
Але що з ним робити? Навіщо нам взагалі задача, що виконується у потоці, яка повертає результат? Очевидно, що у майбутньому ми розраховуємо отримати результат дій, які в майбутньому будуть виконані. Майбутнє англійською — Future. І інтерфейс є із точно таким іменем:
java.util.concurrent.Future
java.util.concurrent.Future
Інтерфейс
java.util.concurrent.Future описує API для роботи із задачами, результат яких ми плануємо отримати у майбутньому: методи отримання результату, методи перевірки статусу.
Для
Future
нас цікавить його реалізація
java.util.concurrent.FutureTask. Тобто це
Task
, який буде виконаний у
Future
. Чим ця реалізація ще цікава, так це тим, що вона реалізує і
Runnable
. Можна вважати це своєрідним адаптером старої моделі роботи із задачами у потоках і нової моделі (нової в тому сенсі, що вона з'явилася у java 1.5).
Ось приклад:
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class HelloWorld {
public static void main(String []args) throws Exception {
Callable task = () -> {
return "Hello, World!";
};
FutureTask<String> future = new FutureTask<>(task);
new Thread(future).start();
System.out.println(future.get());
}
}
Як видно з прикладу, ми отримуємо за допомогою методу
get
результат із задачі
task
.
(!)Важливо, що у момент отримання результату за допомогою методу
get
виконання стає синхронним. Як ви думаєте, який механізм тут буде використаний? Правильно, немає блоку синхронізації — тому
WAITING в JVisualVM ми побачимо не як
monitor
чи
wait
, а як той самий
park
(тому що використовується механізм
LockSupport
).
Функціональні інтерфейси
Далі піде мова про класи з Java 1.8, тому не зайвим буде зробити коротке введення. Подивімося на наступний код:
Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
return "String";
}
};
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
Function<String, Integer> converter = new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return Integer.valueOf(s);
}
};
Як же багато зайвого коду, чи не так? Кожен з оголошених класів виконує якусь одну функцію, але для її опису ми використовуємо купу зайвого допоміжного коду. І розробники Java так само подумали. Тому вони ввели набір "функціональних інтерфейсів" (
@FunctionalInterface
) і вирішили, що тепер Java сама буде "додумувати" за нас усе, крім важливого:
Supplier<String> supplier = () -> "String";
Consumer<String> consumer = s -> System.out.println(s);
Function<String, Integer> converter = s -> Integer.valueOf(s);
Supplier
— постачальник. Він не має параметрів, але повертає щось, тобто постачає це.
Consumer
— споживач. Він приймає на вхід щось (параметр s) і з цим щось щось робить, тобто споживає щось.
Є ще функція. Вона приймає на вхід щось (параметр
s
), щось робить і повертає щось. Як ми бачимо, активно використовуються дженерики. У разі невпевненості можна згадати про них і прочитати "
Теорія дженериків у Java або як на практиці ставити дужки".
CompletableFuture
Минув час, і в Java 1.8 з'явився новий клас, який зветься
CompletableFuture
. Він реалізує інтерфейс
Future
, тобто наші
task
будуть виконані у майбутньому, і ми зможемо виконати
get
і отримати результат. Але ще він реалізує деякий
CompletionStage
. З перекладу вже зрозуміло його призначення: це деякий етап (Stage) якихось обчислень. З коротким введенням у тему можна ознайомитися у огляді "
Introduction to CompletionStage and CompletableFuture".
Давайте перейдемо одразу до справи. Подивимося на список доступних статичних методів, які нам допоможуть розпочати:
![Thread'ом Java не зіпсуєш: Частина IV — Callable, Future і друзі - 2]()
Ось варіанти їх використання:
import java.util.concurrent.CompletableFuture;
public class App {
public static void main(String []args) throws Exception {
// CompletableFuture вже містить результат
CompletableFuture<String> completed;
completed = CompletableFuture.completedFuture("Просто значення");
// CompletableFuture, запускаючий (run) новий потік з Runnable, тому він Void
CompletableFuture<Void> voidCompletableFuture;
voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println("run " + Thread.currentThread().getName());
});
// CompletableFuture, запускаючий новий потік, результат якого візьмемо у Supplier
CompletableFuture<String> supplier;
supplier = CompletableFuture.supplyAsync(() -> {
System.out.println("supply " + Thread.currentThread().getName());
return "Значення";
});
}
}
Якщо ми виконаємо цей код, то побачимо, що створення
CompletableFuture
передбачає запуск і всієї ланцюжка. Тому за деякої схожості зі SteamAPI з Java8 у цьому відмінність цих підходів. Наприклад:
List<String> array = Arrays.asList("one", "two");
Stream<String> stringStream = array.stream().map(value -> {
System.out.println("Executed");
return value.toUpperCase();
});
Це приклад Java 8 Stream Api (детальніше можна з ним ознайомитися тут "
Посібник із Java 8 Stream API в картинках і прикладах"). Якщо запустити цей код, то
Executed
не відобразиться. Тобто при створенні стріму у Java стрім не запускається одразу, а чекає, коли з нього захочуть значення. А от
CompletableFuture
запускає ланцюжок на виконання одразу, не чекаючи, що у нього попросять пораховане значення. Вважаю важливим це розуміти.
Отже, у нас є CompletableFuture. Як же ми можемо скласти ланцюжок і які у нас є засоби?
Згадуємо про функціональні інтерфейси, про які ми писали раніше.
- У нас є функція (
Function
), яка приймає А і повертає Б. Має єдиний метод — apply
(застосувати).
- У нас є споживач (
Consumer
), яка приймає А і нічого не повертає (Void). Має єдиний метод — accept
(прийняти).
- У нас є код, який запускається у потоці
Runnable
, який нічого не приймає і не повертає. Має єдиний метод — run
(запустити).
Друге, що треба пам'ятати, це те, що
CompletalbeFuture
у своїй роботі використовує
Runnable
, споживачів і функції. Враховуючи це, ви завжди зможете згадати, що з
CompletableFuture
можна зробити так:
public static void main(String []args) throws Exception {
AtomicLong longValue = new AtomicLong(0);
Runnable task = () -> longValue.set(new Date().getTime());
Function<Long, Date> dateConverter = (longvalue) -> new Date(longvalue);
Consumer<Date> printer = date -> {
System.out.println(date);
System.out.flush();
};
// CompletableFuture computation
CompletableFuture.runAsync(task)
.thenApply((v) -> longValue.get())
.thenApply(dateConverter)
.thenAccept(printer);
}
У методів
thenRun
,
thenApply
та
thenAccept
є версії
Async
. Це означає, що ці стадії будуть виконані у новому потоці. Він буде взятий з особливого пулу, тому заздалегідь невідомо, який потік буде, новий чи попередній. Все залежить від того, наскільки важкі завдання.
Крім цих методів, є ще три цікаві можливості.
Для наочності уявімо, що у нас є якийсь сервіс, який отримує якесь повідомлення звідкись, і на це потрібен час:
public static class NewsService {
public static String getMessage() {
try {
Thread.currentThread().sleep(3000);
return "Message";
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
Тепер, давайте подивимось на інші можливості, які надає
CompletableFuture
. Ми можемо об'єднувати результат
CompletableFuture
з результатом іншого
CompletableFuture
:
Supplier newsSupplier = () -> NewsService.getMessage();
CompletableFuture<String> reader = CompletableFuture.supplyAsync(newsSupplier);
CompletableFuture.completedFuture("!!")
.thenCombine(reader, (a, b) -> b + a)
.thenAccept(result -> System.out.println(result))
.get();
Тут варто звернути увагу, що за замовчуванням потоки будуть демо-потоками, тому для наочності ми використовуємо
get
, щоб дочекатись результату.
А ще ми можемо не лише об'єднати (combine), але й повертати
CompletableFuture
:
CompletableFuture.completedFuture(2L)
.thenCompose((val) -> CompletableFuture.completedFuture(val + 2))
.thenAccept(result -> System.out.println(result));
Тут хочеться зазначити, що для стислості використано метод
CompletableFuture.completedFuture
. Цей метод не створює новий потік, тому інший ланцюжок буде виконаний у тому ж потоці, в якому був викликаний
completedFuture
.
Також є метод
thenAcceptBoth
. Він дуже схожий на
accept
, але якщо
thenAccept
приймає
consumer
, то
thenAcceptBoth
приймає на вхід ще один
CompletableStage
+
BiConsumer
, тобто
consumer
, який на вхід приймає 2 джерела, а не одне.
Є ще цікава можливість зі словом
Either
:
![Thread'ом Java не зіпсуєш: Частина IV — Callable, Future і друзі - 3]()
Ці методи приймають альтернативний
CompletableStage
і будуть виконані на тому
CompletableStage
, який виконається першим.
І завершити цей огляд хочеться ще однією цікавою можливістю
CompletableFuture
— обробкою помилок.
CompletableFuture.completedFuture(2L)
.thenApply((a) -> {
throw new IllegalStateException("помилка");
}).thenApply((a) -> 3L)
//.exceptionally(ex -> 0L)
.thenAccept(val -> System.out.println(val));
Цей код нічого не зробить, оскільки впаде виключення і нічого не буде. Але якщо ми розкоментуємо
exceptionally
, то визначимо поведінку.
На тему
CompletableFuture
раджу також подивитись наступне відео:
На мою скромну думку, ці відео — одні з найнаочніших на просторах інтернету. З них має бути зрозуміло, як це все працює, який у нас є арсенал і навіщо це все потрібно.
Висновок
Сподіваюся, тепер стало зрозуміло, як можна використовувати потоки для отримання обчислень після того, як вони будуть виконані.
Додатковий матеріал:
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ