1. thenCompose vs. thenApply: разница и когда что использовать
В асинхронном программировании на Java (через CompletableFuture) часто нужно выполнять цепочки действий. Для этого есть два похожих метода: thenApply и thenCompose. Но они работают по-разному!
thenApply
Метод thenApply используется, когда следующий шаг — это простое преобразование значения, без запуска новых асинхронных операций. Он получает результат предыдущего шага, обрабатывает его и возвращает новое значение (не CompletableFuture).
Если вы знакомы со Stream API, то thenApply ведёт себя примерно как map: берёт результат, применяет функцию и возвращает преобразованный вариант.
Пример:
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "42");
CompletableFuture<Integer> lengthFuture = cf.thenApply(s -> s.length());
// lengthFuture содержит 2 (длина строки "42")
Проще говоря, thenApply — это способ сказать: «Когда результат будет готов, сделай с ним вот это».
thenCompose
- Используется, когда следующий шаг — это ещё одна асинхронная операция (возвращает CompletableFuture).
- Позволяет «разворачивать» вложенные CompletableFuture (аналог flatMap).
- Если использовать thenApply с асинхронной функцией, получится CompletableFuture<CompletableFuture<T>> — неудобно!
Пример:
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "user42");
// Допустим, нам нужно по имени пользователя получить его заказы (асинхронно)
CompletableFuture<List<Order>> ordersFuture = cf.thenCompose(username -> fetchOrdersAsync(username));
// fetchOrdersAsync возвращает CompletableFuture<List<Order>>
Визуально:
- thenApply: CF<String> → thenApply(s -> s.length()) → CF<Integer>
- thenCompose: CF<User> → thenCompose(u -> fetchOrdersAsync(u.id)) → CF<List<Order>>
Когда использовать что?
- Функция возвращает обычное значение — используйте thenApply.
- Функция возвращает CompletableFuture — используйте thenCompose.
Пример ошибки:
cf.thenApply(username -> fetchOrdersAsync(username)); // Получите CF<CF<List<Order>>>
cf.thenCompose(username -> fetchOrdersAsync(username)); // Получите CF<List<Order>>
2. Управление пулом потоков (Executor): зачем и как использовать свой Executor
По умолчанию: ForkJoinPool.commonPool()
Когда вы пишете CompletableFuture.supplyAsync(...) или thenApplyAsync(...) без указания Executor, Java использует общий пул потоков — ForkJoinPool.commonPool(). Это удобно, но не всегда подходит:
- Если у вас много долгих или блокирующих операций (сетевые запросы, работа с файлами), общий пул может «забиться», и все задачи будут ждать.
- Иногда нужно изолировать задачи с разными приоритетами или ограничить количество одновременно работающих потоков.
Когда нужен свой Executor?
- Долгие, блокирующие операции (например, запросы к БД, HTTP-запросы, чтение файлов).
- Изоляция задач: чтобы пользовательские задачи не мешали системным.
- Ограничение ресурсов: например, не запускать больше 10 одновременных загрузок.
Как создать свой Executor
Обычно используют ThreadPoolExecutor или фабрики из Executors:
ExecutorService myExecutor = Executors.newFixedThreadPool(10);
Как использовать свой Executor с CompletableFuture
- В методах supplyAsync, runAsync, thenApplyAsync, thenComposeAsync и других можно передать второй аргумент — ваш Executor.
Примеры:
CompletableFuture<String> cf = CompletableFuture.supplyAsync(
() -> loadDataFromNetwork(), myExecutor
);
cf.thenApplyAsync(data -> processData(data), myExecutor)
.thenAcceptAsync(result -> System.out.println(result), myExecutor);
Важно: если не указать Executor, будет использоваться ForkJoinPool.commonPool().
Когда достаточно default Executor?
- Для коротких, CPU-bound задач (простые вычисления).
- Когда не важно, в каком потоке выполняется задача.
3. Обработка тайм-аутов: orTimeout и completeOnTimeout
Асинхронные операции могут зависнуть или выполняться слишком долго (например, если сервер не отвечает). Чтобы не ждать вечно, в CompletableFuture есть методы для работы с тайм-аутами.
orTimeout
- Завершает CompletableFuture с исключением TimeoutException, если операция не завершилась за указанное время.
- Не отменяет реально выполняющуюся задачу, но downstream-цепочка получит ошибку.
Синтаксис:
cf.orTimeout(3, TimeUnit.SECONDS)
.exceptionally(ex -> {
System.out.println("Тайм-аут: " + ex);
return null;
});
Пример:
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
Thread.sleep(5000); // имитируем долгую операцию
return "OK";
});
cf.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> {
System.out.println("Ошибка: " + ex);
return "TIMEOUT";
});
Результат:
Через 2 секунды будет выброшено TimeoutException, и exceptionally обработает ошибку.
completeOnTimeout
- Завершает CompletableFuture с указанным значением, если операция не завершилась за время тайм-аута.
- Не выбрасывает исключение, а возвращает «резервное» значение.
Синтаксис:
cf.completeOnTimeout("DEFAULT", 2, TimeUnit.SECONDS);
Пример:
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
Thread.sleep(5000);
return "OK";
});
cf.completeOnTimeout("TIMEOUT", 2, TimeUnit.SECONDS)
.thenAccept(System.out::println); // Через 2 секунды выведет "TIMEOUT"
Сравнение orTimeout и completeOnTimeout
| Метод | Что делает при тайм-ауте? | Как обрабатывается дальше? |
|---|---|---|
|
Завершает с TimeoutException | Можно обработать через exceptionally/handle |
|
Завершает с указанным значением | thenAccept/thenApply получит это значение |
4. Практика: пример с thenCompose, кастомным Executor и тайм-аутом
Задача:
- Получить пользователя по id (асинхронно, с задержкой).
- Затем асинхронно получить список заказов пользователя (тоже с задержкой).
- Использовать кастомный Executor.
- Добавить тайм-аут на получение заказов.
import java.util.concurrent.*;
import java.util.*;
public class AsyncDemo {
static ExecutorService ioExecutor = Executors.newFixedThreadPool(4);
// Имитация асинхронного получения пользователя
static CompletableFuture<String> fetchUserAsync(int userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "user" + userId;
}, ioExecutor);
}
// Имитация асинхронного получения заказов пользователя
static CompletableFuture<List<String>> fetchOrdersAsync(String username) {
return CompletableFuture.supplyAsync(() -> {
sleep(3000); // Долгая операция!
return List.of("order1", "order2");
}, ioExecutor);
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
}
public static void main(String[] args) {
fetchUserAsync(42)
.thenCompose(username ->
fetchOrdersAsync(username)
.orTimeout(2, TimeUnit.SECONDS) // Тайм-аут на получение заказов
.exceptionally(ex -> {
System.out.println("Не удалось получить заказы: " + ex);
return List.of();
})
)
.thenAccept(orders -> System.out.println("Заказы: " + orders))
.join(); // Ждём завершения всей цепочки
ioExecutor.shutdown();
}
}
Что происходит:
- Получаем пользователя (1 секунда).
- Получаем заказы (3 секунды, но тайм-аут 2 секунды).
- Если не успели — ловим TimeoutException, возвращаем пустой список.
- Всё работает на кастомном Executor.
Результат:
Не удалось получить заказы: java.util.concurrent.TimeoutException
Заказы: []
Если уменьшить задержку в fetchOrdersAsync до 1_000 мс — увидите реальные заказы.
5. Типичные ошибки и нюансы
Ошибка №1: Использование thenApply вместо thenCompose для асинхронных операций.
Если функция возвращает CompletableFuture, а вы применили thenApply, получите вложенный тип CompletableFuture<CompletableFuture<T>>. Это усложнит цепочку и приведёт к лишним обёрткам. Решение: используйте thenCompose, чтобы «сплющить» результат в CompletableFuture<T>.
Ошибка №2: Запуск долгих или IO-задач без собственного Executor.
По умолчанию задачи выполняются в ForkJoinPool.commonPool(). Если его перегрузить, задержки начнут расти, а другие задачи в приложении могут замедлиться. Решение: создавайте собственный ExecutorService и передавайте его в supplyAsync/thenApplyAsync.
Ошибка №3: Ожидание, что orTimeout отменяет выполнение задачи.
orTimeout лишь завершает CompletableFuture с исключением по тайм-ауту, но сама задача продолжает работать в фоне. Решение: если нужно остановить выполнение, используйте cancel(true) или собственные механизмы прерывания.
Ошибка №4: Неправильное понимание области действия тайм-аута.
orTimeout и completeOnTimeout работают только для одного конкретного шага цепочки, а не для всей цепи. Решение: если нужен общий тайм-аут на всю цепочку, оберните её в отдельный CompletableFuture и примените тайм-аут к нему.
Ошибка №5: Не закрыт ExecutorService.
Если после выполнения задач не вызвать shutdown()/shutdownNow() у ExecutorService, потоки продолжат работать, и программа может «повиснуть». Решение: всегда закрывайте ExecutorService в finally или используйте try-with-resources в Java 21+.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ