Вступ
Ми вже розглядали у першій частині, як створюються потоки. Ще раз згадаємо.
Потік — це
Thread, у ньому щось запускається
run, тому скористаємося
tutorialspoint java online compiler'ом і виконаємо наступний код:
public class HelloWorld {
public static void main(String []args){
Runnable task = () -> {
System.out.println("Hello World");
};
new Thread(task).start();
}
}
Єдиний варіант запуску задачі у потоці?
java.util.concurrent.Callable
Виявляється, у
java.lang.Runnable є брат, і звати його
java.util.concurrent.Callable, і з'явився він у Java 1.5. У чому ж різниця? Якщо придивитися до JavaDoc цього інтерфейсу, ми бачимо, що на відміну від
Runnable, новий інтерфейс оголошує метод
call, який повертає результат. Крім того, за замовчуванням він throws Exception. Тобто звільняє нас від необхідності для перевірених винятків писати
try-catch блоки. Уже непогано, правда?
Тепер у нас є замість
Runnable новий task:
Callable task = () -> {
return "Hello, World!";
};
Але що з ним робити? Навіщо нам взагалі задача, що виконується у потоці, яка повертає результат? Очевидно, що у майбутньому ми розраховуємо отримати результат дій, які в майбутньому будуть виконані. Майбутнє англійською — Future. І інтерфейс є із точно таким іменем:
java.util.concurrent.Future
java.util.concurrent.Future
Інтерфейс
java.util.concurrent.Future описує API для роботи із задачами, результат яких ми плануємо отримати у майбутньому: методи отримання результату, методи перевірки статусу.
Для
Future нас цікавить його реалізація
java.util.concurrent.FutureTask. Тобто це
Task, який буде виконаний у
Future. Чим ця реалізація ще цікава, так це тим, що вона реалізує і
Runnable. Можна вважати це своєрідним адаптером старої моделі роботи із задачами у потоках і нової моделі (нової в тому сенсі, що вона з'явилася у java 1.5).
Ось приклад:
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class HelloWorld {
public static void main(String []args) throws Exception {
Callable task = () -> {
return "Hello, World!";
};
FutureTask<String> future = new FutureTask<>(task);
new Thread(future).start();
System.out.println(future.get());
}
}
Як видно з прикладу, ми отримуємо за допомогою методу
get результат із задачі
task.
(!)Важливо, що у момент отримання результату за допомогою методу
get виконання стає синхронним. Як ви думаєте, який механізм тут буде використаний? Правильно, немає блоку синхронізації — тому
WAITING в JVisualVM ми побачимо не як
monitor чи
wait, а як той самий
park (тому що використовується механізм
LockSupport).
Функціональні інтерфейси
Далі піде мова про класи з Java 1.8, тому не зайвим буде зробити коротке введення. Подивімося на наступний код:
Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
return "String";
}
};
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
Function<String, Integer> converter = new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return Integer.valueOf(s);
}
};
Як же багато зайвого коду, чи не так? Кожен з оголошених класів виконує якусь одну функцію, але для її опису ми використовуємо купу зайвого допоміжного коду. І розробники Java так само подумали. Тому вони ввели набір "функціональних інтерфейсів" (
@FunctionalInterface) і вирішили, що тепер Java сама буде "додумувати" за нас усе, крім важливого:
Supplier<String> supplier = () -> "String";
Consumer<String> consumer = s -> System.out.println(s);
Function<String, Integer> converter = s -> Integer.valueOf(s);
Supplier — постачальник. Він не має параметрів, але повертає щось, тобто постачає це.
Consumer — споживач. Він приймає на вхід щось (параметр s) і з цим щось щось робить, тобто споживає щось.
Є ще функція. Вона приймає на вхід щось (параметр
s), щось робить і повертає щось. Як ми бачимо, активно використовуються дженерики. У разі невпевненості можна згадати про них і прочитати "
Теорія дженериків у Java або як на практиці ставити дужки".
CompletableFuture
Минув час, і в Java 1.8 з'явився новий клас, який зветься
CompletableFuture. Він реалізує інтерфейс
Future, тобто наші
task будуть виконані у майбутньому, і ми зможемо виконати
get і отримати результат. Але ще він реалізує деякий
CompletionStage. З перекладу вже зрозуміло його призначення: це деякий етап (Stage) якихось обчислень. З коротким введенням у тему можна ознайомитися у огляді "
Introduction to CompletionStage and CompletableFuture".
Давайте перейдемо одразу до справи. Подивимося на список доступних статичних методів, які нам допоможуть розпочати:
![Thread'ом Java не зіпсуєш: Частина IV — Callable, Future і друзі - 2]()
Ось варіанти їх використання:
import java.util.concurrent.CompletableFuture;
public class App {
public static void main(String []args) throws Exception {
// CompletableFuture вже містить результат
CompletableFuture<String> completed;
completed = CompletableFuture.completedFuture("Просто значення");
// CompletableFuture, запускаючий (run) новий потік з Runnable, тому він Void
CompletableFuture<Void> voidCompletableFuture;
voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println("run " + Thread.currentThread().getName());
});
// CompletableFuture, запускаючий новий потік, результат якого візьмемо у Supplier
CompletableFuture<String> supplier;
supplier = CompletableFuture.supplyAsync(() -> {
System.out.println("supply " + Thread.currentThread().getName());
return "Значення";
});
}
}
Якщо ми виконаємо цей код, то побачимо, що створення
CompletableFuture передбачає запуск і всієї ланцюжка. Тому за деякої схожості зі SteamAPI з Java8 у цьому відмінність цих підходів. Наприклад:
List<String> array = Arrays.asList("one", "two");
Stream<String> stringStream = array.stream().map(value -> {
System.out.println("Executed");
return value.toUpperCase();
});
Це приклад Java 8 Stream Api (детальніше можна з ним ознайомитися тут "
Посібник із Java 8 Stream API в картинках і прикладах"). Якщо запустити цей код, то
Executed не відобразиться. Тобто при створенні стріму у Java стрім не запускається одразу, а чекає, коли з нього захочуть значення. А от
CompletableFuture запускає ланцюжок на виконання одразу, не чекаючи, що у нього попросять пораховане значення. Вважаю важливим це розуміти.
Отже, у нас є CompletableFuture. Як же ми можемо скласти ланцюжок і які у нас є засоби?
Згадуємо про функціональні інтерфейси, про які ми писали раніше.
- У нас є функція (
Function), яка приймає А і повертає Б. Має єдиний метод — apply (застосувати).
- У нас є споживач (
Consumer), яка приймає А і нічого не повертає (Void). Має єдиний метод — accept (прийняти).
- У нас є код, який запускається у потоці
Runnable, який нічого не приймає і не повертає. Має єдиний метод — run (запустити).
Друге, що треба пам'ятати, це те, що
CompletalbeFuture у своїй роботі використовує
Runnable, споживачів і функції. Враховуючи це, ви завжди зможете згадати, що з
CompletableFuture можна зробити так:
public static void main(String []args) throws Exception {
AtomicLong longValue = new AtomicLong(0);
Runnable task = () -> longValue.set(new Date().getTime());
Function<Long, Date> dateConverter = (longvalue) -> new Date(longvalue);
Consumer<Date> printer = date -> {
System.out.println(date);
System.out.flush();
};
// CompletableFuture computation
CompletableFuture.runAsync(task)
.thenApply((v) -> longValue.get())
.thenApply(dateConverter)
.thenAccept(printer);
}
У методів
thenRun,
thenApply та
thenAccept є версії
Async. Це означає, що ці стадії будуть виконані у новому потоці. Він буде взятий з особливого пулу, тому заздалегідь невідомо, який потік буде, новий чи попередній. Все залежить від того, наскільки важкі завдання.
Крім цих методів, є ще три цікаві можливості.
Для наочності уявімо, що у нас є якийсь сервіс, який отримує якесь повідомлення звідкись, і на це потрібен час:
public static class NewsService {
public static String getMessage() {
try {
Thread.currentThread().sleep(3000);
return "Message";
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
Тепер, давайте подивимось на інші можливості, які надає
CompletableFuture. Ми можемо об'єднувати результат
CompletableFuture з результатом іншого
CompletableFuture:
Supplier newsSupplier = () -> NewsService.getMessage();
CompletableFuture<String> reader = CompletableFuture.supplyAsync(newsSupplier);
CompletableFuture.completedFuture("!!")
.thenCombine(reader, (a, b) -> b + a)
.thenAccept(result -> System.out.println(result))
.get();
Тут варто звернути увагу, що за замовчуванням потоки будуть демо-потоками, тому для наочності ми використовуємо
get, щоб дочекатись результату.
А ще ми можемо не лише об'єднати (combine), але й повертати
CompletableFuture:
CompletableFuture.completedFuture(2L)
.thenCompose((val) -> CompletableFuture.completedFuture(val + 2))
.thenAccept(result -> System.out.println(result));
Тут хочеться зазначити, що для стислості використано метод
CompletableFuture.completedFuture. Цей метод не створює новий потік, тому інший ланцюжок буде виконаний у тому ж потоці, в якому був викликаний
completedFuture.
Також є метод
thenAcceptBoth. Він дуже схожий на
accept, але якщо
thenAccept приймає
consumer, то
thenAcceptBoth приймає на вхід ще один
CompletableStage +
BiConsumer, тобто
consumer, який на вхід приймає 2 джерела, а не одне.
Є ще цікава можливість зі словом
Either:
![Thread'ом Java не зіпсуєш: Частина IV — Callable, Future і друзі - 3]()
Ці методи приймають альтернативний
CompletableStage і будуть виконані на тому
CompletableStage, який виконається першим.
І завершити цей огляд хочеться ще однією цікавою можливістю
CompletableFuture — обробкою помилок.
CompletableFuture.completedFuture(2L)
.thenApply((a) -> {
throw new IllegalStateException("помилка");
}).thenApply((a) -> 3L)
//.exceptionally(ex -> 0L)
.thenAccept(val -> System.out.println(val));
Цей код нічого не зробить, оскільки впаде виключення і нічого не буде. Але якщо ми розкоментуємо
exceptionally, то визначимо поведінку.
На тему
CompletableFuture раджу також подивитись наступне відео:
На мою скромну думку, ці відео — одні з найнаочніших на просторах інтернету. З них має бути зрозуміло, як це все працює, який у нас є арсенал і навіщо це все потрібно.
Висновок
Сподіваюся, тепер стало зрозуміло, як можна використовувати потоки для отримання обчислень після того, як вони будуть виконані.
Додатковий матеріал:
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ