JavaRush /Курсы /JAVA 25 SELF /thenCompose + кастомный Executor + тайм-ауты

thenCompose + кастомный Executor + тайм-ауты

JAVA 25 SELF
55 уровень , 4 лекция
Открыта

1. thenCompose vs. thenApply: разница и когда что использовать

В асинхронном программировании на Java (через CompletableFuture) часто нужно выполнять цепочки действий. Для этого есть два похожих метода: thenApply и thenCompose. Но они работают по-разному!

thenApply

Метод thenApply используется, когда следующий шаг — это простое преобразование значения, без запуска новых асинхронных операций. Он получает результат предыдущего шага, обрабатывает его и возвращает новое значение (не CompletableFuture).

Если вы знакомы со Stream API, то thenApply ведёт себя примерно как map: берёт результат, применяет функцию и возвращает преобразованный вариант.

Пример:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "42");
CompletableFuture<Integer> lengthFuture = cf.thenApply(s -> s.length());
// lengthFuture содержит 2 (длина строки "42")

Проще говоря, thenApply — это способ сказать: «Когда результат будет готов, сделай с ним вот это».

thenCompose

  • Используется, когда следующий шаг — это ещё одна асинхронная операция (возвращает CompletableFuture).
  • Позволяет «разворачивать» вложенные CompletableFuture (аналог flatMap).
  • Если использовать thenApply с асинхронной функцией, получится CompletableFuture<CompletableFuture<T>> — неудобно!

Пример:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "user42");

// Допустим, нам нужно по имени пользователя получить его заказы (асинхронно)
CompletableFuture<List<Order>> ordersFuture = cf.thenCompose(username -> fetchOrdersAsync(username));
// fetchOrdersAsync возвращает CompletableFuture<List<Order>>

Визуально:

  • thenApply: CF<String>thenApply(s -> s.length())CF<Integer>
  • thenCompose: CF<User>thenCompose(u -> fetchOrdersAsync(u.id))CF<List<Order>>

Когда использовать что?

  • Функция возвращает обычное значение — используйте thenApply.
  • Функция возвращает CompletableFuture — используйте thenCompose.

Пример ошибки:

cf.thenApply(username -> fetchOrdersAsync(username)); // Получите CF<CF<List<Order>>>
cf.thenCompose(username -> fetchOrdersAsync(username)); // Получите CF<List<Order>>

2. Управление пулом потоков (Executor): зачем и как использовать свой Executor

По умолчанию: ForkJoinPool.commonPool()

Когда вы пишете CompletableFuture.supplyAsync(...) или thenApplyAsync(...) без указания Executor, Java использует общий пул потоков — ForkJoinPool.commonPool(). Это удобно, но не всегда подходит:

  • Если у вас много долгих или блокирующих операций (сетевые запросы, работа с файлами), общий пул может «забиться», и все задачи будут ждать.
  • Иногда нужно изолировать задачи с разными приоритетами или ограничить количество одновременно работающих потоков.

Когда нужен свой Executor?

  • Долгие, блокирующие операции (например, запросы к БД, HTTP-запросы, чтение файлов).
  • Изоляция задач: чтобы пользовательские задачи не мешали системным.
  • Ограничение ресурсов: например, не запускать больше 10 одновременных загрузок.

Как создать свой Executor

Обычно используют ThreadPoolExecutor или фабрики из Executors:

ExecutorService myExecutor = Executors.newFixedThreadPool(10);

Как использовать свой Executor с CompletableFuture

  • В методах supplyAsync, runAsync, thenApplyAsync, thenComposeAsync и других можно передать второй аргумент — ваш Executor.

Примеры:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(
    () -> loadDataFromNetwork(), myExecutor
);

cf.thenApplyAsync(data -> processData(data), myExecutor)
  .thenAcceptAsync(result -> System.out.println(result), myExecutor);

Важно: если не указать Executor, будет использоваться ForkJoinPool.commonPool().

Когда достаточно default Executor?

  • Для коротких, CPU-bound задач (простые вычисления).
  • Когда не важно, в каком потоке выполняется задача.

3. Обработка тайм-аутов: orTimeout и completeOnTimeout

Асинхронные операции могут зависнуть или выполняться слишком долго (например, если сервер не отвечает). Чтобы не ждать вечно, в CompletableFuture есть методы для работы с тайм-аутами.

orTimeout

  • Завершает CompletableFuture с исключением TimeoutException, если операция не завершилась за указанное время.
  • Не отменяет реально выполняющуюся задачу, но downstream-цепочка получит ошибку.

Синтаксис:

cf.orTimeout(3, TimeUnit.SECONDS)
  .exceptionally(ex -> {
      System.out.println("Тайм-аут: " + ex);
      return null;
  });

Пример:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(5000); // имитируем долгую операцию
    return "OK";
});

cf.orTimeout(2, TimeUnit.SECONDS)
  .exceptionally(ex -> {
      System.out.println("Ошибка: " + ex);
      return "TIMEOUT";
  });

Результат:

Через 2 секунды будет выброшено TimeoutException, и exceptionally обработает ошибку.

completeOnTimeout

  • Завершает CompletableFuture с указанным значением, если операция не завершилась за время тайм-аута.
  • Не выбрасывает исключение, а возвращает «резервное» значение.

Синтаксис:

cf.completeOnTimeout("DEFAULT", 2, TimeUnit.SECONDS);

Пример:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(5000);
    return "OK";
});

cf.completeOnTimeout("TIMEOUT", 2, TimeUnit.SECONDS)
  .thenAccept(System.out::println); // Через 2 секунды выведет "TIMEOUT"

Сравнение orTimeout и completeOnTimeout

Метод Что делает при тайм-ауте? Как обрабатывается дальше?
orTimeout
Завершает с TimeoutException Можно обработать через exceptionally/handle
completeOnTimeout
Завершает с указанным значением thenAccept/thenApply получит это значение

4. Практика: пример с thenCompose, кастомным Executor и тайм-аутом

Задача:

  • Получить пользователя по id (асинхронно, с задержкой).
  • Затем асинхронно получить список заказов пользователя (тоже с задержкой).
  • Использовать кастомный Executor.
  • Добавить тайм-аут на получение заказов.
import java.util.concurrent.*;
import java.util.*;

public class AsyncDemo {
    static ExecutorService ioExecutor = Executors.newFixedThreadPool(4);

    // Имитация асинхронного получения пользователя
    static CompletableFuture<String> fetchUserAsync(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "user" + userId;
        }, ioExecutor);
    }

    // Имитация асинхронного получения заказов пользователя
    static CompletableFuture<List<String>> fetchOrdersAsync(String username) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(3000); // Долгая операция!
            return List.of("order1", "order2");
        }, ioExecutor);
    }

    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
    }

    public static void main(String[] args) {
        fetchUserAsync(42)
            .thenCompose(username ->
                fetchOrdersAsync(username)
                    .orTimeout(2, TimeUnit.SECONDS) // Тайм-аут на получение заказов
                    .exceptionally(ex -> {
                        System.out.println("Не удалось получить заказы: " + ex);
                        return List.of();
                    })
            )
            .thenAccept(orders -> System.out.println("Заказы: " + orders))
            .join(); // Ждём завершения всей цепочки

        ioExecutor.shutdown();
    }
}

Что происходит:

  • Получаем пользователя (1 секунда).
  • Получаем заказы (3 секунды, но тайм-аут 2 секунды).
  • Если не успели — ловим TimeoutException, возвращаем пустой список.
  • Всё работает на кастомном Executor.

Результат:

Не удалось получить заказы: java.util.concurrent.TimeoutException
Заказы: []

Если уменьшить задержку в fetchOrdersAsync до 1_000 мс — увидите реальные заказы.

5. Типичные ошибки и нюансы

Ошибка №1: Использование thenApply вместо thenCompose для асинхронных операций.
Если функция возвращает CompletableFuture, а вы применили thenApply, получите вложенный тип CompletableFuture<CompletableFuture<T>>. Это усложнит цепочку и приведёт к лишним обёрткам. Решение: используйте thenCompose, чтобы «сплющить» результат в CompletableFuture<T>.

Ошибка №2: Запуск долгих или IO-задач без собственного Executor.
По умолчанию задачи выполняются в ForkJoinPool.commonPool(). Если его перегрузить, задержки начнут расти, а другие задачи в приложении могут замедлиться. Решение: создавайте собственный ExecutorService и передавайте его в supplyAsync/thenApplyAsync.

Ошибка №3: Ожидание, что orTimeout отменяет выполнение задачи.
orTimeout лишь завершает CompletableFuture с исключением по тайм-ауту, но сама задача продолжает работать в фоне. Решение: если нужно остановить выполнение, используйте cancel(true) или собственные механизмы прерывания.

Ошибка №4: Неправильное понимание области действия тайм-аута.
orTimeout и completeOnTimeout работают только для одного конкретного шага цепочки, а не для всей цепи. Решение: если нужен общий тайм-аут на всю цепочку, оберните её в отдельный CompletableFuture и примените тайм-аут к нему.

Ошибка №5: Не закрыт ExecutorService.
Если после выполнения задач не вызвать shutdown()/shutdownNow() у ExecutorService, потоки продолжат работать, и программа может «повиснуть». Решение: всегда закрывайте ExecutorService в finally или используйте try-with-resources в Java 21+.

1
Задача
JAVA 25 SELF, 55 уровень, 4 лекция
Недоступна
Запрос Статуса: Мгновенный Ответ с Резервом
Запрос Статуса: Мгновенный Ответ с Резервом
1
Задача
JAVA 25 SELF, 55 уровень, 4 лекция
Недоступна
Панель Пользователя: Быстрый Доступ к Данным с Умным Управлением
Панель Пользователя: Быстрый Доступ к Данным с Умным Управлением
1
Опрос
Асинхронное программирование, 55 уровень, 4 лекция
Недоступен
Асинхронное программирование
Асинхронное программирование
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ