JavaRush /Java блог /Random /Thread'ом Java не испортишь: Часть IV — Callable, Future ...
Viacheslav
3 уровень

Thread'ом Java не испортишь: Часть IV — Callable, Future и друзья

Статья из группы Random

Вступление

Мы уже рассматривали в первой части, как создаются потоки. Ещё раз вспомним. Thread'ом Java не испортишь: Часть IV — Callable, Future и друзья - 1Поток — это Thread, в нём что-то запускается run, поэтому воспользуемся tutorialspoint java online compiler'ом и выполним следующий код:

public class HelloWorld {
    
    public static void main(String []args){
        Runnable task = () -> {
            System.out.println("Hello World");
        };
        new Thread(task).start();
    }
}
Единтсвенный ли это вариант запуска задачи в потоке?

java.util.concurrent.Callable

Оказывается, у java.lang.Runnable есть брат и зовут его java.util.concurrent.Callable и появился он на свет в Java 1.5. В чём же различия? Если приглядеться к JavaDoc этого интерфейса, мы видим, что в отличие от Runnable, новый интерфейс объявляет метод call, который возвращает результат. Кроме того, по умолчанию он throws Exception. То есть избавляет нас от необходимости на проверяемые исключения писать try-catch блоки. Уже неплохо, правда? Теперь у нас есть вместо Runnable новый task:

Callable task = () -> {
	return "Hello, World!";
};
Но что с ним делать? Зачем нам вообще задача, выполняемая в потоке, которая возвращает результат? Очевидно, что в дальнейшем мы рассчитываем получить результат действий, которыев в будущем будут выполнены. Будущее по-английский — Future. И интерфейс есть с точно таким же именем: java.util.concurrent.Future

java.util.concurrent.Future

Интерфейс java.util.concurrent.Future описывает API для работы с задачами, результат которых мы планируем получить в будущем: методы получения результата, методы проверки статуса. Для Future нас интересует его реализация java.util.concurrent.FutureTask. То есть это Task, который будет выполнен во Future. Чем эта реализация ещё интересна, так это тем, что она реализует и Runnable. Можно считать это своего рода адаптером старой модели работы с задачами в потоках и новой модели (новой в том смысле, что она появилась в java 1.5). Вот пример:

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class HelloWorld {
    
    public static void main(String []args) throws Exception {
        Callable task = () -> {
            return "Hello, World!";
        };
        FutureTask<String> future = new FutureTask<>(task);
        new Thread(future).start();
        System.out.println(future.get());
    }
}
Как видно из примера, мы получаем при помощи метода get результат из задачи task. (!)Важно, что в момент получения результата при помощи метода get выполнение становится синхронным. Как вы думаете, какой механизм тут будет использован? Правильно, нет блока синхронизации — поэтому WAITING в JVisualVM мы увидим не как monitor или wait, а как тот самый park (т.к. используется механизм LockSupport).

Функциональные интерфейсы

Дальше пойдёт речь про классы из Java 1.8, поэтому не лишним будет сделать краткое введение. Посмотрим на следующий код:

Supplier<String> supplier = new Supplier<String>() {
	@Override
	public String get() {
		return "String";
	}
};
Consumer<String> consumer = new Consumer<String>() {
	@Override
	public void accept(String s) {
		System.out.println(s);
	}
};
Function<String, Integer> converter = new Function<String, Integer>() {
	@Override
	public Integer apply(String s) {
		return Integer.valueOf(s);
	}
};
Как же много лишнего кода, не правда ли? Каждый из объявляемых классов выполняет какую-то одну функцию, но для её описания мы используем кучу лишнего вспомогательного кода. И разработчики Java так же подумали. Поэтому, они ввели набор "функциональных интерфейсов" (@FunctionalInterface) и решили, что теперь Java сама будет "додумывать" за нас всё, кроме важного:

Supplier<String> supplier = () -> "String";
Consumer<String> consumer = s -> System.out.println(s);
Function<String, Integer> converter = s -> Integer.valueOf(s);
Supplier — поставщик. Он не имеет параметров, но возвращает что-то, то есть поставляет это. Consumer — потребитель. Он принимает на вход что-то (параметр s) и с этим что-то что-то делает, то есть потребляет что-то. Есть ещё функция. Она принимает на вход что-то (параметр s), что-то делает и возвращает что-то. Как мы видим, активно используются дженерики. В случае неуверенности можно вспомнить про них и прочитать "Теория дженериков в Java или как на практике ставить скобки".

CompletableFuture

Шло время, и в Java 1.8 появился новый класс, который зовётся CompletableFuture. Он реализует интерфейс Future, то есть наши task будут выполнены в будущем, и мы сможем выполнить get и получить результат. Но ещё он реализует некоторый CompletionStage. Из перевода уже понятно его назначение: это некий этап (Stage) каких-то вычислений. С кратким введением в тему можно ознакомиться в обзоре "Introduction to CompletionStage and CompletableFuture". Давайте перейдём сразу к делу. Посмотрим на список доступных статических методов, которые нам помогут начать: Thread'ом Java не испортишь: Часть IV — Callable, Future и друзья - 2Вот варианты их использования:

import java.util.concurrent.CompletableFuture;
public class App {
    public static void main(String []args) throws Exception {
        // CompletableFuture уже содержащий результат
        CompletableFuture<String> completed;
        completed = CompletableFuture.completedFuture("Просто значение");
        // CompletableFuture, запускающий (run) новый поток с Runnable, поэтому он Void
        CompletableFuture<Void> voidCompletableFuture;
        voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("run " + Thread.currentThread().getName());
        });
        // CompletableFuture, запускающий новый поток, результат которого возьмём у Supplier 
        CompletableFuture<String> supplier;
        supplier = CompletableFuture.supplyAsync(() -> {
            System.out.println("supply " + Thread.currentThread().getName());
            return "Значение";
        });
    }
}
Если мы выполним этот код, то увидим, что создание CompletableFuture подразумевает запуск и всей цепочки. Поэтому при некоторой схожести со SteamAPI из Java8 в этом отличие этих подходов. Например:

List<String> array = Arrays.asList("one", "two");
Stream<String> stringStream = array.stream().map(value -> {
	System.out.println("Executed");
	return value.toUpperCase();
});
Это пример Java 8 Stream Api (подробнее можно с ним ознакомиться здесь "Руководство по Java 8 Stream API в картинках и примерах"). Если запустить этот код, то Executed не отобразится. То есть при создании стрима в Java стрим не запускается сразу, а ждёт, когда из него захотят значение. А вот CompletableFuture запускает цепочку на выполнение сразу, не дожидаясь того, что у него попросят посчитанное значение. Считаю важным это понимать. Итак, у нас есть CompletableFuture. Как же мы можем составить цепочку и какие у нас есть средства? Вспомним про функциональные интерфейсы, о которых мы писали ранее.
  • У нас есть функция (Function), которая принимает А и возвращает Б. Имеет единственный метод — apply (применить).
  • У нас есть потребитель (Consumer), которая принимает А и ничего не возвращает (Void). Имеет единственный метод — accept (принять).
  • У нас есть запускаемый в потоке код Runnable, который не принимает и не возвращает. Имеет единственный метод — run (запустить).
Второе, что надо помнить, что CompletalbeFuture в своей работе использует Runnable, потребителей и функции. Учитывая это, вы всегда сможете вспомнить, что с CompletableFuture можно делать так:

public static void main(String []args) throws Exception {
        AtomicLong longValue = new AtomicLong(0);
        Runnable task = () -> longValue.set(new Date().getTime());
        Function<Long, Date> dateConverter = (longvalue) -> new Date(longvalue);
        Consumer<Date> printer = date -> {
            System.out.println(date);
            System.out.flush();
        };
        // CompletableFuture computation
        CompletableFuture.runAsync(task)
                         .thenApply((v) -> longValue.get())
                         .thenApply(dateConverter)
                         .thenAccept(printer);
}
У методов thenRun, thenApply и thenAccept есть версии Async. Это значит, что эти стадии будут выполнены в новом потоке. Он будет взят из особого пула, поэтому заранее неизвестно, какой поток будет, новый или прежний. Всё зависит от того, на сколько тяжёлые задачи. Помимо этих методов есть ещё три интересные возможности. Для наглядности представим, что у нас есть некий сервис, который получает какое-то сообщение откуда-то и на это требуется время:

public static class NewsService {
	public static String getMessage() {
		try {
			Thread.currentThread().sleep(3000);
			return "Message";
		} catch (InterruptedException e) {
			throw new IllegalStateException(e);
		}
	}
}
Теперь, давайте посмотрим на другие возможности, которые предоставляет CompletableFuture. Мы можем объединять результат CompletableFuture с результатом другого CompletableFuture:

Supplier newsSupplier = () -> NewsService.getMessage();
        
CompletableFuture<String> reader = CompletableFuture.supplyAsync(newsSupplier);
CompletableFuture.completedFuture("!!")
				 .thenCombine(reader, (a, b) -> b + a)
				 .thenAccept(result -> System.out.println(result))
				 .get();
Тут стоить обратить внимание, что по умолчанию потоки будут демон-потоками, поэтому для наглядности мы используем get, чтобы дождаться результат. А ещё мы можем не только объединить (combine), но и возвращать CompletableFuture:

CompletableFuture.completedFuture(2L)
				.thenCompose((val) -> CompletableFuture.completedFuture(val + 2))
                               .thenAccept(result -> System.out.println(result));
Тут хочется отметить, что для краткости использован метод CompletableFuture.completedFuture. Данный метод не создаёт новый поток, поэтому остальная цепочка будет выполнена в том же потоке, в котором был вызван completedFuture. Также есть метод thenAcceptBoth. Он очень похож на accept, но если thenAccept принимает consumer, то thenAcceptBoth принимает на вход ещё один CompletableStage + BiConsumer, то есть consumer, который на вход принимает 2 источника, а не один. Есть ещё интересная возможность со словом Either: Thread'ом Java не испортишь: Часть IV — Callable, Future и друзья - 3Данные методы принимают альтернативный CompletableStage и будут выполнены на том CompletableStage, который первее выполнится. И закончить этот обзор хочется ещё одной интересной возможностью CompletableFuture — обработкой ошибок.

CompletableFuture.completedFuture(2L)
				 .thenApply((a) -> {
					throw new IllegalStateException("error");
				 }).thenApply((a) -> 3L)
				 //.exceptionally(ex -> 0L)
				 .thenAccept(val -> System.out.println(val));
Данный код ничего не сделает, т.к. упадёт исключение и ничего не будет. Но если мы раскомментируем exceptionally, то мы определим поведение. На тему CompletableFuture советую так же посмотреть следующее видео: По моему скромному мнению, данные видео — одни из самых наглядных на просторах интернета. Из них должно быть понятно, как это всё работает, какой у нас есть арсенал и зачем это всё нужно.

Заключение

Надеюсь, теперь стало понятно, как можно использовать потоки для получения вычислений после того, как они будут высчитаны. Дополнительный материал: #Viacheslav
Комментарии (65)
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ
16 апреля 2024
Последний ролик вообще не понятен
Sergey Уровень 31
18 ноября 2023

(!)Важно, что в момент получения результата при помощи метода get выполнение становится синхронным. Как вы думаете, какой механизм тут будет использован? Правильно, нет блока синхронизации — поэтому WAITING в JVisualVM мы увидим не как monitor или wait, а как тот самый park (т.к. используется механизм LockSupport).
Интересно что с 13 джавой main в состоянии WAIT. Что-то изменилось в новых версиях?
Anton Seliverstov Уровень 111
14 ноября 2023
Ссылки на дополнительные материалы, которые уже мертвы: 1) "Introduction to CompletionStage and CompletableFuture" - рабочая ссылка 2) "Пишем асинхронный код с CompletableFuture" - рабочая ссылка
Adm Уровень 49
6 ноября 2023
автор спасибо тебе за ссылки,видео и за статьи ты только не удаляй их)
Олег Уровень 79 Expert
26 октября 2023
Заключение Надеюсь, теперь стало понятно, как можно использовать потоки для получения вычислений после того, как они будут высчитаны. Фантазёр
Алексей (pro_bell) Уровень 61
15 октября 2023
Пишем асинхронный код с CompletableFuture - ссылка на ЧТО?
25 августа 2023
Эта статья как будто обнулила все что я вычитывала в сторонних источниках на ту же тему - чтобы понять блок CompletableFuture нужно очень хорошо шарить за CompletableFuture.
wokku Уровень 51
22 августа 2023
Вот это уже хард
Виктор Уровень 111 Expert
10 августа 2023
Нафига тут вообще кусок про функциональные интерфейсы не могу понять
partiec Уровень 33
5 июля 2023
norm