Вступ

Ми вже розглядали у першій частині, як створюються потоки. Ще раз згадаємо. Потік — це Thread, у ньому щось запускається run, тому скористаємося tutorialspoint java online compiler'ом і виконаємо наступний код:

public class HelloWorld {
    
    public static void main(String []args){
        Runnable task = () -> {
            System.out.println("Hello World");
        };
        new Thread(task).start();
    }
}
Єдиний варіант запуску задачі у потоці?

java.util.concurrent.Callable

Виявляється, у java.lang.Runnable є брат, і звати його java.util.concurrent.Callable, і з'явився він у Java 1.5. У чому ж різниця? Якщо придивитися до JavaDoc цього інтерфейсу, ми бачимо, що на відміну від Runnable, новий інтерфейс оголошує метод call, який повертає результат. Крім того, за замовчуванням він throws Exception. Тобто звільняє нас від необхідності для перевірених винятків писати try-catch блоки. Уже непогано, правда? Тепер у нас є замість Runnable новий task:

Callable task = () -> {
	return "Hello, World!";
};
Але що з ним робити? Навіщо нам взагалі задача, що виконується у потоці, яка повертає результат? Очевидно, що у майбутньому ми розраховуємо отримати результат дій, які в майбутньому будуть виконані. Майбутнє англійською — Future. І інтерфейс є із точно таким іменем: java.util.concurrent.Future

java.util.concurrent.Future

Інтерфейс java.util.concurrent.Future описує API для роботи із задачами, результат яких ми плануємо отримати у майбутньому: методи отримання результату, методи перевірки статусу. Для Future нас цікавить його реалізація java.util.concurrent.FutureTask. Тобто це Task, який буде виконаний у Future. Чим ця реалізація ще цікава, так це тим, що вона реалізує і Runnable. Можна вважати це своєрідним адаптером старої моделі роботи із задачами у потоках і нової моделі (нової в тому сенсі, що вона з'явилася у java 1.5). Ось приклад:

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class HelloWorld {
    
    public static void main(String []args) throws Exception {
        Callable task = () -> {
            return "Hello, World!";
        };
        FutureTask<String> future = new FutureTask<>(task);
        new Thread(future).start();
        System.out.println(future.get());
    }
}
Як видно з прикладу, ми отримуємо за допомогою методу get результат із задачі task. (!)Важливо, що у момент отримання результату за допомогою методу get виконання стає синхронним. Як ви думаєте, який механізм тут буде використаний? Правильно, немає блоку синхронізації — тому WAITING в JVisualVM ми побачимо не як monitor чи wait, а як той самий park (тому що використовується механізм LockSupport).

Функціональні інтерфейси

Далі піде мова про класи з Java 1.8, тому не зайвим буде зробити коротке введення. Подивімося на наступний код:

Supplier<String> supplier = new Supplier<String>() {
	@Override
	public String get() {
		return "String";
	}
};
Consumer<String> consumer = new Consumer<String>() {
	@Override
	public void accept(String s) {
		System.out.println(s);
	}
};
Function<String, Integer> converter = new Function<String, Integer>() {
	@Override
	public Integer apply(String s) {
		return Integer.valueOf(s);
	}
};
Як же багато зайвого коду, чи не так? Кожен з оголошених класів виконує якусь одну функцію, але для її опису ми використовуємо купу зайвого допоміжного коду. І розробники Java так само подумали. Тому вони ввели набір "функціональних інтерфейсів" (@FunctionalInterface) і вирішили, що тепер Java сама буде "додумувати" за нас усе, крім важливого:

Supplier<String> supplier = () -> "String";
Consumer<String> consumer = s -> System.out.println(s);
Function<String, Integer> converter = s -> Integer.valueOf(s);
Supplier — постачальник. Він не має параметрів, але повертає щось, тобто постачає це. Consumer — споживач. Він приймає на вхід щось (параметр s) і з цим щось щось робить, тобто споживає щось. Є ще функція. Вона приймає на вхід щось (параметр s), щось робить і повертає щось. Як ми бачимо, активно використовуються дженерики. У разі невпевненості можна згадати про них і прочитати "Теорія дженериків у Java або як на практиці ставити дужки".

CompletableFuture

Минув час, і в Java 1.8 з'явився новий клас, який зветься CompletableFuture. Він реалізує інтерфейс Future, тобто наші task будуть виконані у майбутньому, і ми зможемо виконати get і отримати результат. Але ще він реалізує деякий CompletionStage. З перекладу вже зрозуміло його призначення: це деякий етап (Stage) якихось обчислень. З коротким введенням у тему можна ознайомитися у огляді "Introduction to CompletionStage and CompletableFuture". Давайте перейдемо одразу до справи. Подивимося на список доступних статичних методів, які нам допоможуть розпочати: Thread'ом Java не зіпсуєш: Частина IV — Callable, Future і друзі - 2Ось варіанти їх використання:

import java.util.concurrent.CompletableFuture;
public class App {
    public static void main(String []args) throws Exception {
        // CompletableFuture вже містить результат
        CompletableFuture<String> completed;
        completed = CompletableFuture.completedFuture("Просто значення");
        // CompletableFuture, запускаючий (run) новий потік з Runnable, тому він Void
        CompletableFuture<Void> voidCompletableFuture;
        voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("run " + Thread.currentThread().getName());
        });
        // CompletableFuture, запускаючий новий потік, результат якого візьмемо у Supplier 
        CompletableFuture<String> supplier;
        supplier = CompletableFuture.supplyAsync(() -> {
            System.out.println("supply " + Thread.currentThread().getName());
            return "Значення";
        });
    }
}
Якщо ми виконаємо цей код, то побачимо, що створення CompletableFuture передбачає запуск і всієї ланцюжка. Тому за деякої схожості зі SteamAPI з Java8 у цьому відмінність цих підходів. Наприклад:

List<String> array = Arrays.asList("one", "two");
Stream<String> stringStream = array.stream().map(value -> {
	System.out.println("Executed");
	return value.toUpperCase();
});
Це приклад Java 8 Stream Api (детальніше можна з ним ознайомитися тут "Посібник із Java 8 Stream API в картинках і прикладах"). Якщо запустити цей код, то Executed не відобразиться. Тобто при створенні стріму у Java стрім не запускається одразу, а чекає, коли з нього захочуть значення. А от CompletableFuture запускає ланцюжок на виконання одразу, не чекаючи, що у нього попросять пораховане значення. Вважаю важливим це розуміти. Отже, у нас є CompletableFuture. Як же ми можемо скласти ланцюжок і які у нас є засоби? Згадуємо про функціональні інтерфейси, про які ми писали раніше.
  • У нас є функція (Function), яка приймає А і повертає Б. Має єдиний метод — apply (застосувати).
  • У нас є споживач (Consumer), яка приймає А і нічого не повертає (Void). Має єдиний метод — accept (прийняти).
  • У нас є код, який запускається у потоці Runnable, який нічого не приймає і не повертає. Має єдиний метод — run (запустити).
Друге, що треба пам'ятати, це те, що CompletalbeFuture у своїй роботі використовує Runnable, споживачів і функції. Враховуючи це, ви завжди зможете згадати, що з CompletableFuture можна зробити так:

public static void main(String []args) throws Exception {
        AtomicLong longValue = new AtomicLong(0);
        Runnable task = () -> longValue.set(new Date().getTime());
        Function<Long, Date> dateConverter = (longvalue) -> new Date(longvalue);
        Consumer<Date> printer = date -> {
            System.out.println(date);
            System.out.flush();
        };
        // CompletableFuture computation
        CompletableFuture.runAsync(task)
                         .thenApply((v) -> longValue.get())
                         .thenApply(dateConverter)
                         .thenAccept(printer);
}
У методів thenRun, thenApply та thenAccept є версії Async. Це означає, що ці стадії будуть виконані у новому потоці. Він буде взятий з особливого пулу, тому заздалегідь невідомо, який потік буде, новий чи попередній. Все залежить від того, наскільки важкі завдання. Крім цих методів, є ще три цікаві можливості. Для наочності уявімо, що у нас є якийсь сервіс, який отримує якесь повідомлення звідкись, і на це потрібен час:

public static class NewsService {
	public static String getMessage() {
		try {
			Thread.currentThread().sleep(3000);
			return "Message";
		} catch (InterruptedException e) {
			throw new IllegalStateException(e);
		}
	}
}
Тепер, давайте подивимось на інші можливості, які надає CompletableFuture. Ми можемо об'єднувати результат CompletableFuture з результатом іншого CompletableFuture:

Supplier newsSupplier = () -> NewsService.getMessage();
        
CompletableFuture<String> reader = CompletableFuture.supplyAsync(newsSupplier);
CompletableFuture.completedFuture("!!")
				 .thenCombine(reader, (a, b) -> b + a)
				 .thenAccept(result -> System.out.println(result))
				 .get();
Тут варто звернути увагу, що за замовчуванням потоки будуть демо-потоками, тому для наочності ми використовуємо get, щоб дочекатись результату. А ще ми можемо не лише об'єднати (combine), але й повертати CompletableFuture:

CompletableFuture.completedFuture(2L)
				.thenCompose((val) -> CompletableFuture.completedFuture(val + 2))
                               .thenAccept(result -> System.out.println(result));
Тут хочеться зазначити, що для стислості використано метод CompletableFuture.completedFuture. Цей метод не створює новий потік, тому інший ланцюжок буде виконаний у тому ж потоці, в якому був викликаний completedFuture. Також є метод thenAcceptBoth. Він дуже схожий на accept, але якщо thenAccept приймає consumer, то thenAcceptBoth приймає на вхід ще один CompletableStage + BiConsumer, тобто consumer, який на вхід приймає 2 джерела, а не одне. Є ще цікава можливість зі словом Either: Thread'ом Java не зіпсуєш: Частина IV — Callable, Future і друзі - 3Ці методи приймають альтернативний CompletableStage і будуть виконані на тому CompletableStage, який виконається першим. І завершити цей огляд хочеться ще однією цікавою можливістю CompletableFuture — обробкою помилок.

CompletableFuture.completedFuture(2L)
				 .thenApply((a) -> {
					throw new IllegalStateException("помилка");
				 }).thenApply((a) -> 3L)
				 //.exceptionally(ex -> 0L)
				 .thenAccept(val -> System.out.println(val));
Цей код нічого не зробить, оскільки впаде виключення і нічого не буде. Але якщо ми розкоментуємо exceptionally, то визначимо поведінку. На тему CompletableFuture раджу також подивитись наступне відео: На мою скромну думку, ці відео — одні з найнаочніших на просторах інтернету. З них має бути зрозуміло, як це все працює, який у нас є арсенал і навіщо це все потрібно.

Висновок

Сподіваюся, тепер стало зрозуміло, як можна використовувати потоки для отримання обчислень після того, як вони будуть виконані. Додатковий матеріал: