JavaRush /Курсы /JAVA 25 SELF /Structured Concurrency

Structured Concurrency

JAVA 25 SELF
58 уровень , 0 лекция
Открыта

1. Введение

Когда потоки играют каждый свою партию

Обычная многопоточность часто напоминает репетицию без дирижёра. Каждый поток — как музыкант, который играет свою мелодию, не слушая остальных. Кто-то закончил раньше и пошёл курить, кто-то застрял на одном аккорде, кто-то вообще перепутал ноты и выдал ошибку. В результате получается не симфония, а какофония: разобраться, кто где сбился, почти невозможно, а остановить всех разом — ещё тот квест.

Structured Concurrency решает эту проблему. Она делает из разрозненных потоков настоящий ансамбль: все задачи объединены под одним «дирижёром». Если он даёт отбой — оркестр замолкает. Если один музыкант облажался — остальные аккуратно останавливаются, не ломая общую гармонию. Все результаты и ошибки собираются централизованно, а не разбросаны по углам кода.

Представьте: Вы не бросаете музыкантов играть кто во что горазд, а собираете их в один зал. Есть дирижёр, есть партитура, и даже если труба фальшивит — оркестр не срывается, а завершает выступление красиво.

Что даёт Structured Concurrency

  • Единый «скоуп» задачи: все подзадачи живут в рамках одного блока кода, их жизненный цикл ограничен этим блоком.
  • Предсказуемое завершение: родительский поток не завершится, пока не завершатся все подзадачи.
  • Централизованная отмена: если одна задача упала или родитель решил завершиться — все подзадачи корректно отменяются.
  • Консистентная обработка ошибок: ошибки подзадач агрегируются, можно получить «дерево причин» (tree of causes).
  • Чистый и читаемый код: нет «висящих» потоков, нет забытых задач, нет гонок за отмену.

Structured Concurrency — это не просто новый API, а новый стиль мышления: задачи должны быть структурированы так же, как и обычные блоки кода (например, try-with-resources).

2. Статус Structured Concurrency в Java

На момент написания курса Structured Concurrency находится в статусе Preview (Java 21–23), но ожидается продвижение к GA (General Availability) в Java 24/25. API находится в пакете jdk.incubator.concurrent. Перед использованием в продакшене обязательно проверьте актуальные релиз-ноты вашей версии JDK!

Главные классы:

  • StructuredTaskScope — базовый класс для управления группой задач.
  • Варианты: StructuredTaskScope.ShutdownOnFailure, StructuredTaskScope.ShutdownOnSuccess — политики завершения задач.

Основные концепции StructuredTaskScope

Модель: fork, join и дружеская перекличка результатов

Когда дирижёр (то есть родительская задача) даёт знак — подзадачи разбегаются по своим партиям. Этот момент называется fork — как будто вы запускаете музыкантов играть свои кусочки в разных залах.

Потом приходит время join — дирижёр поднимает палочку, и все возвращаются, чтобы сыграть финальный аккорд вместе.

А дальше можно спросить у каждого участника, как всё прошло:

  • через resultNow() получить результат сразу, если всё сыграно без ошибок;
  • через throwIfFailed() — убедиться, что никто не сфальшивил. Если кто-то всё-таки запутался в нотах, выбрасывается единое исключение — как если бы дирижёр сказал: «У нас сбой в оркестре, начинаем заново».

Политики завершения

У любого дирижёра есть своё правило, когда останавливать музыку. В Structured Concurrency это задаётся политикой завершения:

  • ShutdownOnFailure — если хоть один музыкант сбился с ритма, дирижёр машет рукой: «Стоп! Начинаем сначала». Все остальные сразу прекращают играть.
  • ShutdownOnSuccess — наоборот, как только кто-то идеально сыграл свою партию, дирижёр доволен: «Хватит, дальше не надо, у нас уже есть победитель». Остальные замолкают — политика первого успешного ответа.

Работа с виртуальными потоками

Каждую подзадачу StructuredTaskScope запускает в виртуальном потоке. Это как если бы у вас был оркестр, где каждый музыкант понятливый и быстрый, без капризов и требований к сцене. Можете смело создавать сотни, тысячи таких исполнителей — это не тяжеловесные треды, а почти невесомые ноты, которые звучат ровно тогда, когда нужно.

3. Пример: агрегатор HTTP-запросов

Рассмотрим практическую задачу: у нас есть три источника данных (например, три разных сервера), и мы хотим получить ответ либо от всех (и агрегировать), либо от первого, кто ответит успешно.

Вариант 1: «Все должны быть успешны» (ShutdownOnFailure)

import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

public class AggregatorAllSuccess {
    public static void main(String[] args) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Future<String> f1 = scope.fork(() -> fetchFromSource1());
            Future<String> f2 = scope.fork(() -> fetchFromSource2());
            Future<String> f3 = scope.fork(() -> fetchFromSource3());

            scope.join(); // ждем завершения всех задач
            scope.throwIfFailed(); // если хоть одна упала — бросаем исключение

            // Все задачи успешны — можно агрегировать результаты
            String result = f1.resultNow() + f2.resultNow() + f3.resultNow();
            System.out.println("Агрегированный результат: " + result);
        }
    }

    static String fetchFromSource1() { /* ... */ return "A"; }
    static String fetchFromSource2() { /* ... */ return "B"; }
    static String fetchFromSource3() { /* ... */ return "C"; }
}

Что происходит:

  • Все три задачи запускаются параллельно (в виртуальных потоках).
  • Если хотя бы одна упала — остальные отменяются, бросается исключение.
  • Если все успешны — можно безопасно агрегировать результаты.

Вариант 2: «Успех по первому валидному» (ShutdownOnSuccess)

import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

public class AggregatorFirstSuccess {
    public static void main(String[] args) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
            Future<String> f1 = scope.fork(() -> fetchFromSource1());
            Future<String> f2 = scope.fork(() -> fetchFromSource2());
            Future<String> f3 = scope.fork(() -> fetchFromSource3());

            scope.join(); // ждем первого успешного
            scope.throwIfFailed(); // если все упали — бросаем исключение

            String result = scope.result(); // результат первой успешной задачи
            System.out.println("Первый успешный результат: " + result);
        }
    }

    static String fetchFromSource1() { /* ... */ return "A"; }
    static String fetchFromSource2() { /* ... */ return "B"; }
    static String fetchFromSource3() { /* ... */ return "C"; }
}

Что происходит:

  • Как только одна задача завершилась успешно — остальные отменяются.
  • Если все упали — бросается исключение.

4. Автоматическая отмена и деградация

StructuredTaskScope сам заботится об отмене оставшихся задач, если политика этого требует. Например, если одна задача упала (ShutdownOnFailure) или одна успешно завершилась (ShutdownOnSuccess), остальные задачи получают сигнал отмены (interrupt).

Пример: корректное завершение с тайм-аутом

import jdk.incubator.concurrent.StructuredTaskScope;
import java.time.Instant;
import java.util.concurrent.Future;

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> f1 = scope.fork(() -> fetchWithTimeout());
    Future<String> f2 = scope.fork(() -> fetchWithTimeout());

    scope.joinUntil(Instant.now().plusSeconds(2)); // ждем максимум 2 секунды
    scope.throwIfFailed();

    String result = f1.resultNow() + f2.resultNow();
    System.out.println(result);
}

Если задачи не завершились за 2 секунды — будет выброшено исключение, все задачи отменятся.

5. Ошибки и обработка исключений

Как исключения подзадач промаршрутизируются в scope

Иногда во время концерта кто-то всё-таки промахивается по нотам — StructuredTaskScope не делает вид, что ничего не произошло. Он аккуратно записывает, кто именно сыграл мимо кассы, и потом передаёт дирижёру полный отчёт. Когда вы вызываете throwIfFailed(), он бросает агрегированное исключение — что-то вроде сводного отчёта: «Вот список тех, кто сегодня выдал фальшивую ноту». Если нужно, можно развернуть это «дерево причин» и посмотреть, кто конкретно подвёл. А если хочется узнать про конкретного исполнителя — Future.exceptionNow() расскажет, чем именно закончилась его партия.

Когда отмена — это не провал

Важно помнить: отмена задачи не всегда значит ошибку. Если дирижёр сказал «всё, концерт окончен», то музыканты просто складывают инструменты — это cancelled, но не failed. Ошибкой считается только та ситуация, когда кто-то реально сыграл не то, и это исключение уже попадёт в общую сводку.

Пример: дерево причин

import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> f1 = scope.fork(() -> { throw new RuntimeException("Ошибка 1"); });
    Future<String> f2 = scope.fork(() -> { throw new RuntimeException("Ошибка 2"); });

    scope.join();
    scope.throwIfFailed(); // бросит исключение с обеими причинами
} catch (Exception e) {
    e.printStackTrace();
    // Можно получить suppressed exceptions через e.getSuppressed()
}

6. Сравнение с CompletableFuture

StructuredTaskScope и CompletableFuture — оба позволяют запускать параллельные задачи, но:

  • StructuredTaskScope удобен, когда задачи логически связаны и должны завершаться/отменяться вместе (иерархия задач).
  • CompletableFuture хорош для композиции задач без иерархии (например, цепочки преобразований, реактивные сценарии).

Когда StructuredTaskScope упрощает код:

  • Когда нужно гарантировать, что все подзадачи завершились перед выходом из блока.
  • Когда нужна централизованная отмена и обработка ошибок.
  • Когда важно, чтобы не осталось «висящих» задач.

Когда CompletableFuture удобнее:

  • Когда задачи не связаны и могут жить своей жизнью.
  • Когда нужна сложная композиция (thenCombine, thenCompose и т.д.).

7. Практика: агрегатор HTTP-запросов

Задача: отправить запросы к 3 источникам, получить первый успешный ответ

import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

public class HttpAggregator {
    public static void main(String[] args) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
            Future<String> f1 = scope.fork(() -> httpRequest("https://api1.example.com"));
            Future<String> f2 = scope.fork(() -> httpRequest("https://api2.example.com"));
            Future<String> f3 = scope.fork(() -> httpRequest("https://api3.example.com"));

            scope.join();
            scope.throwIfFailed();

            String result = scope.result();
            System.out.println("Первый успешный ответ: " + result);
        }
    }

    static String httpRequest(String url) throws Exception {
        // Имитация запроса (можно использовать HttpClient)
        Thread.sleep((long) (Math.random() * 1000));
        if (Math.random() < 0.3) throw new RuntimeException("Ошибка запроса: " + url);
        return "Ответ от " + url;
    }
}

Задача: если одна подзадача упала — корректно гасим остальные

import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> f1 = scope.fork(() -> httpRequest("https://api1.example.com"));
    Future<String> f2 = scope.fork(() -> httpRequest("https://api2.example.com"));

    scope.join();
    scope.throwIfFailed();

    String result = f1.resultNow() + f2.resultNow();
    System.out.println("Оба ответа: " + result);
} catch (Exception e) {
    System.err.println("Ошибка в одной из задач: " + e.getMessage());
}

8. Типичные ошибки при работе с StructuredTaskScope

Ошибка №1: забыли вызвать join() или throwIfFailed().
Если не вызвать join(), задачи могут не завершиться до выхода из блока. Если не вызвать throwIfFailed(), ошибки подзадач останутся незамеченными.

Ошибка №2: попытка получить результат до завершения задачи.
Вызов resultNow() до завершения задачи бросит IllegalStateException. Сначала дождитесь завершения через join().

Ошибка №3: игнорирование отмены.
Если задача была отменена (например, из-за политики scope), не пытайтесь получить её результат — будет исключение.

Ошибка №4: смешивание разных политик завершения.
Не стоит пытаться вручную отменять задачи внутри scope — используйте политики ShutdownOnFailure или ShutdownOnSuccess.

Ошибка №5: запуск долгих CPU-bound задач в виртуальных потоках.
StructuredTaskScope по умолчанию использует виртуальные потоки — они идеальны для I/O-bound задач, но не ускоряют тяжёлые вычисления.

Ошибка №6: забыли закрыть scope (нет try-with-resources).
StructuredTaskScope реализует AutoCloseable — всегда используйте try-with-resources, чтобы гарантировать завершение всех задач.

1
Задача
JAVA 25 SELF, 58 уровень, 0 лекция
Недоступна
Координация Отделов: Проект "Космический Модуль" 🚀
Координация Отделов: Проект "Космический Модуль" 🚀
1
Задача
JAVA 25 SELF, 58 уровень, 0 лекция
Недоступна
Поиск Быстрейшего Сервера: Система "Сверхзвуковая Доставка Данных" ⚡
Поиск Быстрейшего Сервера: Система "Сверхзвуковая Доставка Данных" ⚡
Комментарии (2)
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ
Andrey Уровень 1
6 ноября 2025
58+
nastya_zhadan Уровень 66
4 ноября 2025
58