JavaRush /Java блог /Random UA /Thread'ом Java не зіпсуєш: Частина IV - Callable, Future ...
Viacheslav
3 рівень

Thread'ом Java не зіпсуєш: Частина IV - Callable, Future та друзі

Стаття з групи Random UA

Вступ

Ми вже розглядали у першій частині , як створюються потоки. Ще раз згадаємо. Thread'ом Java не зіпсуєш: Частина IV - Callable, Future та друзі - 1Потік - це Thread, в ньому щось запускається run, тому скористаємося tutorialspoint java online compiler'ом і виконаємо наступний код:
public class HelloWorld {

    public static void main(String []args){
        Runnable task = () -> {
            System.out.println("Hello World");
        };
        new Thread(task).start();
    }
}
Чи єдиний це варіант запуску завдання в потоці?

java.util.concurrent.Callable

Виявляється, у java.lang.Runnable є брат і звуть його java.util.concurrent.Callable і з'явився він на світ Java 1.5. У чому ж різниця? Якщо придивитися до JavaDoc цього інтерфейсу, ми бачимо, що на відміну від Runnable, новий інтерфейс оголошує метод call, який повертає результат. Крім того, за умовчанням він throws Exception. Тобто позбавляє нас необхідності на виключення, що перевіряються, писати try-catchблоки. Вже непогано, правда? Тепер у нас є замість Runnableнового task:
Callable task = () -> {
	return "Hello, World!";
};
Але що з нею робити? Навіщо нам взагалі завдання, яке виконується в потоці, яке повертає результат? Очевидно, що надалі ми розраховуємо отримати результат дій, які в майбутньому будуть виконані. Майбутнє англійською — Future. І інтерфейс є з таким самим ім'ям:java.util.concurrent.Future

java.util.concurrent.Future

Інтерфейс java.util.concurrent.Future описує API для роботи із завданнями, результат яких ми плануємо отримати в майбутньому: методи отримання результату, методи перевірки статусу. Для Futureнас цікавить його реалізація java.util.concurrent.FutureTask . Тобто це Task, який буде виконано Future. Чим ця реалізація ще цікава, то це тим, що вона реалізує і Runnable. Можна вважати це свого роду адаптером старої моделі роботи із завданнями в потоках та нової моделі (нової в тому сенсі, що вона з'явилася в java 1.5). Ось приклад:
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class HelloWorld {

    public static void main(String []args) throws Exception {
        Callable task = () -> {
            return "Hello, World!";
        };
        FutureTask<String> future = new FutureTask<>(task);
        new Thread(future).start();
        System.out.println(future.get());
    }
}
Як видно з прикладу, ми отримуємо за допомогою методу getрезультат із завдання task. (!) Важливо, що у момент отримання результату з допомогою методу getвиконання стає синхронним. Як ви вважаєте, який механізм тут буде використаний? Правильно, немає блоку синхронізації - тому WAITING в JVisualVM ми побачимо не як monitorабо wait, а як цей park(т.к. використовується механізм LockSupport).

Функціональні інтерфейси

Далі йтиметься про класи з Java 1.8, тому не зайвим буде зробити короткий вступ. Подивимося на наступний код:
Supplier<String> supplier = new Supplier<String>() {
	@Override
	public String get() {
		return "String";
	}
};
Consumer<String> consumer = new Consumer<String>() {
	@Override
	public void accept(String s) {
		System.out.println(s);
	}
};
Function<String, Integer> converter = new Function<String, Integer>() {
	@Override
	public Integer apply(String s) {
		return Integer.valueOf(s);
	}
};
Як же багато зайвого коду, чи не так? Кожен з класів, що оголошуються, виконує якусь одну функцію, але для її опису ми використовуємо купу зайвого допоміжного коду. І розробники Java так само подумали. Тому вони ввели набір "функціональних інтерфейсів" ( @FunctionalInterface) і вирішабо, що тепер Java сама "додумуватиме" за нас все, крім важливого:
Supplier<String> supplier = () -> "String";
Consumer<String> consumer = s -> System.out.println(s);
Function<String, Integer> converter = s -> Integer.valueOf(s);
Supplier- Постачальник. Він не має параметрів, але повертає щось, тобто постачає це. Consumer- Споживач. Він приймає на вхід щось (параметр s) і з цим щось робить, тобто споживає щось. Є ще функція. Вона приймає на вхід щось (параметр s), щось робить і щось повертає. Як бачимо, активно використовуються дженерики. У разі невпевненості можна згадати про них і прочитати " Теорія дженериків у Java або як на практиці ставити дужки ".

CompletableFuture

Минав час, і в Java 1.8 з'явився новий клас, який зветься CompletableFuture. Він реалізує інтерфейс Future, тобто наші taskбудуть виконані в майбутньому, і ми зможемо виконати getта отримати результат. Але ще він реалізує деякий CompletionStage. З перекладу зрозуміло його призначення: це якийсь етап (Stage) якихось обчислень. З коротким введенням у тему можна ознайомитись у огляді " Introduction to CompletionStage and CompletableFuture ". Давайте перейдемо одразу до справи. Подивімося на список доступних статичних методів, які нам допоможуть розпочати: Thread'ом Java не зіпсуєш: Частина IV - Callable, Future та друзі - 2Ось варіанти їх використання:
import java.util.concurrent.CompletableFuture;
public class App {
    public static void main(String []args) throws Exception {
        // CompletableFuture уже содержащий результат
        CompletableFuture<String> completed;
        completed = CompletableFuture.completedFuture("Просто значення");
        // CompletableFuture, запускающий (run) новый поток с Runnable, поэтому он Void
        CompletableFuture<Void> voidCompletableFuture;
        voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("run " + Thread.currentThread().getName());
        });
        // CompletableFuture, запускающий новый поток, результат которого возьмём у Supplier
        CompletableFuture<String> supplier;
        supplier = CompletableFuture.supplyAsync(() -> {
            System.out.println("supply " + Thread.currentThread().getName());
            return "Значение";
        });
    }
}
Якщо ми виконаємо цей код, то побачимо, що створення CompletableFutureпередбачає запуск і всього ланцюжка. Тому при певній схожості зі SteamAPI з Java8 у цьому відмінність цих підходів. Наприклад:
List<String> array = Arrays.asList("one", "two");
Stream<String> stringStream = array.stream().map(value -> {
	System.out.println("Executed");
	return value.toUpperCase();
});
Це приклад Java 8 Stream Api (докладніше можна з ним ознайомитися тут " Посібник з Java 8 Stream API в картинках і прикладах "). Якщо запустити цей код, Executedне з'явиться. Тобто при створенні стриму Java стрім не запускається відразу, а чекає, коли з нього захочуть значення. А ось CompletableFutureзапускає ланцюжок на виконання одразу, не чекаючи на те, що в нього попросять пораховане значення. Вважаю важливим це розуміти. Отже, ми маємо CompletableFuture. Як же ми можемо скласти ланцюжок і які ми маємо кошти? Згадаймо про функціональні інтерфейси, про які ми писали раніше.
  • Ми маємо функцію ( Function), яка приймає А і повертає Б. Має єдиний метод — apply(застосувати).
  • У нас є споживач ( Consumer), який приймає А і нічого не повертає (Void). Має єдиний метод – accept(прийняти).
  • У нас є код, що запускається в потоці Runnable, який не приймає і не повертає. Має єдиний метод run(запустити).
Друге, що треба пам'ятати, що CompletalbeFutureу своїй роботі використовує Runnable, споживачів та функції. Враховуючи це, ви завжди зможете згадати, що CompletableFutureможна робити так:
public static void main(String []args) throws Exception {
        AtomicLong longValue = new AtomicLong(0);
        Runnable task = () -> longValue.set(new Date().getTime());
        Function<Long, Date> dateConverter = (longvalue) -> new Date(longvalue);
        Consumer<Date> printer = date -> {
            System.out.println(date);
            System.out.flush();
        };
        // CompletableFuture computation
        CompletableFuture.runAsync(task)
                         .thenApply((v) -> longValue.get())
                         .thenApply(dateConverter)
                         .thenAccept(printer);
}
У методів thenRun, thenApplyі thenAcceptє версії Async. Це означає, що ці стадії будуть виконані у новому потоці. Його буде взято з особливого пулу, тому заздалегідь невідомо, який потік буде, новий чи колишній. Все залежить від того, наскільки важкі завдання. Крім цих методів, є ще три цікаві можливості. Для наочності уявімо, що у нас є якийсь сервіс, який отримує якесь повідомлення звідкись і на це потрібен час:
public static class NewsService {
	public static String getMessage() {
		try {
			Thread.currentThread().sleep(3000);
			return "Message";
		} catch (InterruptedException e) {
			throw new IllegalStateException(e);
		}
	}
}
Тепер, давайте подивимося на інші можливості, які надає CompletableFuture. Ми можемо поєднувати результат CompletableFutureз результатом іншого CompletableFuture:
Supplier newsSupplier = () -> NewsService.getMessage();

CompletableFuture<String> reader = CompletableFuture.supplyAsync(newsSupplier);
CompletableFuture.completedFuture("!!")
				 .thenCombine(reader, (a, b) -> b + a)
				 .thenAccept(result -> System.out.println(result))
				 .get();
Тут варто звернути увагу, що за замовчуванням потоки будуть демон-потоками, тому для наочності ми використовуємо getдля того, щоб дочекатися результату. А ще ми можемо не тільки об'єднати (combine), але й повертати CompletableFuture:
CompletableFuture.completedFuture(2L)
				.thenCompose((val) -> CompletableFuture.completedFuture(val + 2))
                               .thenAccept(result -> System.out.println(result));
Тут хочеться відзначити, що для стислості використаний метод CompletableFuture.completedFuture. Даний метод не створює новий потік, тому решта ланцюжка буде виконана в тому ж потоці, в якому був викликаний completedFuture. Також є метод thenAcceptBoth. Він дуже схожий на accept, але якщо thenAcceptприймає consumer, то thenAcceptBothприймає на вхід ще один CompletableStage+ BiConsumer, тобто consumerякий на вхід приймає 2 джерела, а не один. Є ще цікава можливість зі словом Either: Thread'ом Java не зіпсуєш: Частина IV - Callable, Future та друзі - 3Дані методи приймають альтернативний CompletableStageі будуть виконані на тому CompletableStage, який першим виконається. І закінчити цей огляд хочеться ще однією цікавою можливістю CompletableFutureобробкою помилок.
CompletableFuture.completedFuture(2L)
				 .thenApply((a) -> {
					throw new IllegalStateException("error");
				 }).thenApply((a) -> 3L)
				 //.exceptionally(ex -> 0L)
				 .thenAccept(val -> System.out.println(val));
Цей код нічого не зробить, т.к. впаде виняток і нічого не буде. Але якщо ми розкоментуємо exceptionally, то визначимо поведінку. На тему CompletableFutureраджу також подивитись наступне відео: На мою скромну думку, дані відео – одні з найнаочніших на просторах інтернету. З них має бути зрозумілим, як це все працює, який у нас є арсенал і навіщо це все потрібно.

Висновок

Сподіваюся тепер стало зрозуміло, як можна використовувати потоки для отримання обчислень після того, як вони будуть обчислені. Додатковий матеріал: #Viacheslav
Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ