1. Введение
Когда потоки играют каждый свою партию
Обычная многопоточность часто напоминает репетицию без дирижёра. Каждый поток — как музыкант, который играет свою мелодию, не слушая остальных. Кто-то закончил раньше и пошёл курить, кто-то застрял на одном аккорде, кто-то вообще перепутал ноты и выдал ошибку. В результате получается не симфония, а какофония: разобраться, кто где сбился, почти невозможно, а остановить всех разом — ещё тот квест.
Structured Concurrency решает эту проблему. Она делает из разрозненных потоков настоящий ансамбль: все задачи объединены под одним «дирижёром». Если он даёт отбой — оркестр замолкает. Если один музыкант облажался — остальные аккуратно останавливаются, не ломая общую гармонию. Все результаты и ошибки собираются централизованно, а не разбросаны по углам кода.
Представьте: Вы не бросаете музыкантов играть кто во что горазд, а собираете их в один зал. Есть дирижёр, есть партитура, и даже если труба фальшивит — оркестр не срывается, а завершает выступление красиво.
Что даёт Structured Concurrency
- Единый «скоуп» задачи: все подзадачи живут в рамках одного блока кода, их жизненный цикл ограничен этим блоком.
- Предсказуемое завершение: родительский поток не завершится, пока не завершатся все подзадачи.
- Централизованная отмена: если одна задача упала или родитель решил завершиться — все подзадачи корректно отменяются.
- Консистентная обработка ошибок: ошибки подзадач агрегируются, можно получить «дерево причин» (tree of causes).
- Чистый и читаемый код: нет «висящих» потоков, нет забытых задач, нет гонок за отмену.
Structured Concurrency — это не просто новый API, а новый стиль мышления: задачи должны быть структурированы так же, как и обычные блоки кода (например, try-with-resources).
2. Статус Structured Concurrency в Java
На момент написания курса Structured Concurrency находится в статусе Preview (Java 21–23), но ожидается продвижение к GA (General Availability) в Java 24/25. API находится в пакете jdk.incubator.concurrent. Перед использованием в продакшене обязательно проверьте актуальные релиз-ноты вашей версии JDK!
Главные классы:
- StructuredTaskScope — базовый класс для управления группой задач.
- Варианты: StructuredTaskScope.ShutdownOnFailure, StructuredTaskScope.ShutdownOnSuccess — политики завершения задач.
Основные концепции StructuredTaskScope
Модель: fork, join и дружеская перекличка результатов
Когда дирижёр (то есть родительская задача) даёт знак — подзадачи разбегаются по своим партиям. Этот момент называется fork — как будто вы запускаете музыкантов играть свои кусочки в разных залах.
Потом приходит время join — дирижёр поднимает палочку, и все возвращаются, чтобы сыграть финальный аккорд вместе.
А дальше можно спросить у каждого участника, как всё прошло:
- через resultNow() получить результат сразу, если всё сыграно без ошибок;
- через throwIfFailed() — убедиться, что никто не сфальшивил. Если кто-то всё-таки запутался в нотах, выбрасывается единое исключение — как если бы дирижёр сказал: «У нас сбой в оркестре, начинаем заново».
Политики завершения
У любого дирижёра есть своё правило, когда останавливать музыку. В Structured Concurrency это задаётся политикой завершения:
- ShutdownOnFailure — если хоть один музыкант сбился с ритма, дирижёр машет рукой: «Стоп! Начинаем сначала». Все остальные сразу прекращают играть.
- ShutdownOnSuccess — наоборот, как только кто-то идеально сыграл свою партию, дирижёр доволен: «Хватит, дальше не надо, у нас уже есть победитель». Остальные замолкают — политика первого успешного ответа.
Работа с виртуальными потоками
Каждую подзадачу StructuredTaskScope запускает в виртуальном потоке. Это как если бы у вас был оркестр, где каждый музыкант понятливый и быстрый, без капризов и требований к сцене. Можете смело создавать сотни, тысячи таких исполнителей — это не тяжеловесные треды, а почти невесомые ноты, которые звучат ровно тогда, когда нужно.
3. Пример: агрегатор HTTP-запросов
Рассмотрим практическую задачу: у нас есть три источника данных (например, три разных сервера), и мы хотим получить ответ либо от всех (и агрегировать), либо от первого, кто ответит успешно.
Вариант 1: «Все должны быть успешны» (ShutdownOnFailure)
import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;
public class AggregatorAllSuccess {
public static void main(String[] args) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> f1 = scope.fork(() -> fetchFromSource1());
Future<String> f2 = scope.fork(() -> fetchFromSource2());
Future<String> f3 = scope.fork(() -> fetchFromSource3());
scope.join(); // ждем завершения всех задач
scope.throwIfFailed(); // если хоть одна упала — бросаем исключение
// Все задачи успешны — можно агрегировать результаты
String result = f1.resultNow() + f2.resultNow() + f3.resultNow();
System.out.println("Агрегированный результат: " + result);
}
}
static String fetchFromSource1() { /* ... */ return "A"; }
static String fetchFromSource2() { /* ... */ return "B"; }
static String fetchFromSource3() { /* ... */ return "C"; }
}
Что происходит:
- Все три задачи запускаются параллельно (в виртуальных потоках).
- Если хотя бы одна упала — остальные отменяются, бросается исключение.
- Если все успешны — можно безопасно агрегировать результаты.
Вариант 2: «Успех по первому валидному» (ShutdownOnSuccess)
import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;
public class AggregatorFirstSuccess {
public static void main(String[] args) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
Future<String> f1 = scope.fork(() -> fetchFromSource1());
Future<String> f2 = scope.fork(() -> fetchFromSource2());
Future<String> f3 = scope.fork(() -> fetchFromSource3());
scope.join(); // ждем первого успешного
scope.throwIfFailed(); // если все упали — бросаем исключение
String result = scope.result(); // результат первой успешной задачи
System.out.println("Первый успешный результат: " + result);
}
}
static String fetchFromSource1() { /* ... */ return "A"; }
static String fetchFromSource2() { /* ... */ return "B"; }
static String fetchFromSource3() { /* ... */ return "C"; }
}
Что происходит:
- Как только одна задача завершилась успешно — остальные отменяются.
- Если все упали — бросается исключение.
4. Автоматическая отмена и деградация
StructuredTaskScope сам заботится об отмене оставшихся задач, если политика этого требует. Например, если одна задача упала (ShutdownOnFailure) или одна успешно завершилась (ShutdownOnSuccess), остальные задачи получают сигнал отмены (interrupt).
Пример: корректное завершение с тайм-аутом
import jdk.incubator.concurrent.StructuredTaskScope;
import java.time.Instant;
import java.util.concurrent.Future;
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> f1 = scope.fork(() -> fetchWithTimeout());
Future<String> f2 = scope.fork(() -> fetchWithTimeout());
scope.joinUntil(Instant.now().plusSeconds(2)); // ждем максимум 2 секунды
scope.throwIfFailed();
String result = f1.resultNow() + f2.resultNow();
System.out.println(result);
}
Если задачи не завершились за 2 секунды — будет выброшено исключение, все задачи отменятся.
5. Ошибки и обработка исключений
Как исключения подзадач промаршрутизируются в scope
Иногда во время концерта кто-то всё-таки промахивается по нотам — StructuredTaskScope не делает вид, что ничего не произошло. Он аккуратно записывает, кто именно сыграл мимо кассы, и потом передаёт дирижёру полный отчёт. Когда вы вызываете throwIfFailed(), он бросает агрегированное исключение — что-то вроде сводного отчёта: «Вот список тех, кто сегодня выдал фальшивую ноту». Если нужно, можно развернуть это «дерево причин» и посмотреть, кто конкретно подвёл. А если хочется узнать про конкретного исполнителя — Future.exceptionNow() расскажет, чем именно закончилась его партия.
Когда отмена — это не провал
Важно помнить: отмена задачи не всегда значит ошибку. Если дирижёр сказал «всё, концерт окончен», то музыканты просто складывают инструменты — это cancelled, но не failed. Ошибкой считается только та ситуация, когда кто-то реально сыграл не то, и это исключение уже попадёт в общую сводку.
Пример: дерево причин
import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> f1 = scope.fork(() -> { throw new RuntimeException("Ошибка 1"); });
Future<String> f2 = scope.fork(() -> { throw new RuntimeException("Ошибка 2"); });
scope.join();
scope.throwIfFailed(); // бросит исключение с обеими причинами
} catch (Exception e) {
e.printStackTrace();
// Можно получить suppressed exceptions через e.getSuppressed()
}
6. Сравнение с CompletableFuture
StructuredTaskScope и CompletableFuture — оба позволяют запускать параллельные задачи, но:
- StructuredTaskScope удобен, когда задачи логически связаны и должны завершаться/отменяться вместе (иерархия задач).
- CompletableFuture хорош для композиции задач без иерархии (например, цепочки преобразований, реактивные сценарии).
Когда StructuredTaskScope упрощает код:
- Когда нужно гарантировать, что все подзадачи завершились перед выходом из блока.
- Когда нужна централизованная отмена и обработка ошибок.
- Когда важно, чтобы не осталось «висящих» задач.
Когда CompletableFuture удобнее:
- Когда задачи не связаны и могут жить своей жизнью.
- Когда нужна сложная композиция (thenCombine, thenCompose и т.д.).
7. Практика: агрегатор HTTP-запросов
Задача: отправить запросы к 3 источникам, получить первый успешный ответ
import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;
public class HttpAggregator {
public static void main(String[] args) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
Future<String> f1 = scope.fork(() -> httpRequest("https://api1.example.com"));
Future<String> f2 = scope.fork(() -> httpRequest("https://api2.example.com"));
Future<String> f3 = scope.fork(() -> httpRequest("https://api3.example.com"));
scope.join();
scope.throwIfFailed();
String result = scope.result();
System.out.println("Первый успешный ответ: " + result);
}
}
static String httpRequest(String url) throws Exception {
// Имитация запроса (можно использовать HttpClient)
Thread.sleep((long) (Math.random() * 1000));
if (Math.random() < 0.3) throw new RuntimeException("Ошибка запроса: " + url);
return "Ответ от " + url;
}
}
Задача: если одна подзадача упала — корректно гасим остальные
import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> f1 = scope.fork(() -> httpRequest("https://api1.example.com"));
Future<String> f2 = scope.fork(() -> httpRequest("https://api2.example.com"));
scope.join();
scope.throwIfFailed();
String result = f1.resultNow() + f2.resultNow();
System.out.println("Оба ответа: " + result);
} catch (Exception e) {
System.err.println("Ошибка в одной из задач: " + e.getMessage());
}
8. Типичные ошибки при работе с StructuredTaskScope
Ошибка №1: забыли вызвать join() или throwIfFailed().
Если не вызвать join(), задачи могут не завершиться до выхода из блока. Если не вызвать throwIfFailed(), ошибки подзадач останутся незамеченными.
Ошибка №2: попытка получить результат до завершения задачи.
Вызов resultNow() до завершения задачи бросит IllegalStateException. Сначала дождитесь завершения через join().
Ошибка №3: игнорирование отмены.
Если задача была отменена (например, из-за политики scope), не пытайтесь получить её результат — будет исключение.
Ошибка №4: смешивание разных политик завершения.
Не стоит пытаться вручную отменять задачи внутри scope — используйте политики ShutdownOnFailure или ShutdownOnSuccess.
Ошибка №5: запуск долгих CPU-bound задач в виртуальных потоках.
StructuredTaskScope по умолчанию использует виртуальные потоки — они идеальны для I/O-bound задач, но не ускоряют тяжёлые вычисления.
Ошибка №6: забыли закрыть scope (нет try-with-resources).
StructuredTaskScope реализует AutoCloseable — всегда используйте try-with-resources, чтобы гарантировать завершение всех задач.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ