1. Введение в AsyncStream
Если вы только начали привыкать к async/await, то соблазн такой: «Ну я же могу написать async-функцию и вернуть результат. Зачем ещё какая-то новая штука?». Проблема в том, что async-функция по своей природе заточена под один результат (или одну ошибку), а в реальной жизни часто нужны много результатов по мере готовности.
Представьте типичные ситуации в нашем CLI-проекте LibraryCLI:
Мы делаем команду, которая загружает много книг из сети или из файла. Мы не хотим молчать две минуты, а потом сказать «готово» (это UX уровня “микроволновка без таймера”). Нам хочется в процессе выводить: «загружено 1/50… 2/50… 3/50…». Или мы хотим отдавать найденные элементы по одному, чтобы пользователь мог начать работать с первыми результатами, не дожидаясь конца всей операции.
Старый мир (до async/await) обычно решал это callbacks’ами или делегатами: onItem, onProgress, onCompleted. И вот именно для такого «много значений во времени» стандартная библиотека Swift добавила AsyncStream — тип, который служит мостом между миром callbacks/делегатов и миром for await. В proposal по AsyncStream это прямо формулируется как цель: адаптировать APIs, которые выдают много значений «по событиям», к контракту AsyncSequence.
Важное ощущение, которое я хочу, чтобы у вас появилось: AsyncStream — это не магия и не «ещё один вид коллекций». Это просто удобный способ сказать: «Вот источник, который будет иногда публиковать значения, а вот потребитель, который будет их аккуратно читать через for await».
Producer и consumer: где живёт Continuation
Чтобы нормально пользоваться AsyncStream, полезно мысленно разделить систему на две роли. Первая роль — consumer (потребитель): это код, который читает значения через for await и делает полезную работу (печатает, обновляет состояние, копит статистику). Вторая роль — producer (производитель): это код, который эти значения порождает (читает файл, выполняет запросы, слушает события, считает прогресс).
В AsyncStream эти роли разведены на уровне типов. Consumer держит в руках сам AsyncStream<Element> (его можно перебирать). Producer держит в руках AsyncStream<Element>.Continuation (в него можно “писать”). В стандартном описании AsyncStream прямо подчёркивается: значения yield-ятся через continuation, а потребляются через интерфейс AsyncSequence.
Почти всегда эта схема выглядит так:
flowchart LR
P[Producer<br/>генерирует события] -->|"yield(value)"| C[Continuation<br/>пишущая сторона]
C --> S[AsyncStream<Element><br/>читающая сторона]
S -->|for await| U[Consumer<br/>обрабатывает события]
Да, между continuation и consumer внутри есть буфер и разные нюансы поведения, но сегодня нам важно другое: вы создаёте поток, отдаёте наружу “читающую” сторону, а “пишущую” сторону используете внутри, чтобы публиковать элементы.
И ещё одно: AsyncStream исторически задуман именно как «корневая» асинхронная последовательность — источник значений. Это не трансформация, не map, не filter, а место, где поток начинается.
2. Базовая форма AsyncStream
Теперь соберём минимальную «болванку». Технически AsyncStream создаётся конструктором, куда вы передаёте замыкание. Это замыкание получает continuation. Всё, что вы сделаете с continuation — это и есть ваш producer-код.
В самом простом виде мы можем сделать поток из нескольких значений «прямо сейчас». Это не самый жизненный пример, но он даёт правильное ощущение: yield публикует элементы, finish() сообщает «всё, конец». В документации proposal подчёркивается, что finish() — обязательный сигнал завершения: без него consumer может ждать следующий элемент бесконечно.
import Foundation
func makeHelloStream() -> AsyncStream<String> {
AsyncStream { continuation in
continuation.yield("Hello")
continuation.yield("Async")
continuation.yield("Stream")
continuation.finish()
}
}
Здесь producer синхронный: он за один заход “выплюнул” три значения и завершил поток.
Consumer (мы это делали в прошлой лекции, но покажу коротко для связности) выглядит так:
func consumeHelloStream() async {
for await word in makeHelloStream() {
print(word)
}
print("done")
}
Обратите внимание на две вещи.
Первая: finish() — это не «опционально». Если вы забудете его, цикл for await просто не узнает, что «всё закончилось» (а значит может ждать вечно).
Вторая: замыкание создания стрима (AsyncStream { ... }) само по себе должно быть быстрым. Если вы внутри него начнёте делать долгую работу (читать гигабайтный файл, ходить в сеть), вы получите очень неприятную смесь: поток ещё не создан, а программа уже «зависла». Поэтому типовой приём — запускать долгую producer-работу в отдельной Task.
3. Producer внутри Task: значения приходят со временем
Сейчас сделаем тот вариант, ради которого AsyncStream чаще всего и берут: поток начинает жить, producer выдаёт значения постепенно, а consumer читает их по мере появления. Это похоже на конвейер: на входе кто-то кладёт коробки на ленту, на выходе кто-то эти коробки разбирает. Главное — не путать: лента сама по себе не ускоряет работу, она делает её более плавной и предсказуемой.
Начнём с простого: поток «тиков», который раз в 200 мс публикует число.
import Foundation
func makeTickStream(count: Int) -> AsyncStream<Int> {
AsyncStream { continuation in
Task {
for i in 1...count {
continuation.yield(i)
try await Task.sleep(nanoseconds: 200_000_000)
}
continuation.finish()
}
}
}
Здесь есть три ключевые строки, которые стоит запомнить «на уровне мышечной памяти»:
Мы создаём AsyncStream { continuation in ... }, внутри сразу создаём Task { ... }, а дальше много раз вызываем continuation.yield(...), и в конце continuation.finish().
Почему Task здесь уместен? Потому что Task.sleep — это async-операция. А замыкание AsyncStream { ... } не async, оно не позволяет await. Поэтому мы и «уходим» в Task, где уже можно писать try await.
Если хотим сделать код чуть безопаснее (чтобы при любой причине выхода из producer-кода поток всё равно завершился), удобно поставить defer на finish(). Это не «обязательная магия», просто хороший стиль.
import Foundation
func makeSafeTickStream(count: Int) -> AsyncStream<Int> {
AsyncStream { continuation in
Task {
defer { continuation.finish() }
for i in 1...count { continuation.yield(i) }
}
}
}
Да, тут нет задержки — зато видно, как defer превращает завершение потока в «гарантированное». И это полезно, потому что забыть finish() — одна из самых частых ошибок при первом знакомстве. А стрим без finish() — как сериал без последней серии: все ждут, а автор ушёл за кофе.
4. Адаптация callback-API в AsyncStream
Теперь перейдём к самой “вкусной” причине, почему AsyncStream вообще существует: адаптация старых API, которые работают на callbacks. В proposal по AsyncStream подчёркивается, что это как раз “bridge” для случаев, когда значения приходят через callback/делегат много раз, а не возвращаются одной функцией.
Сделаем маленький учебный «легаси-источник», который вызывает onValue несколько раз, а потом onDone. Да, он будет на Task, но суть та же, что и у делегатов/нотификейшенов: события приходят сами, когда захотят.
import Foundation
final class LegacyEmitter {
var onValue: ((Int) -> Void)?
var onDone: (() -> Void)?
func start() {
Task {
for i in 1...3 { onValue?(i) }
onDone?()
}
}
}
Теперь пишем адаптер. Идея простая: в момент создания стрима мы «подписываемся» на callbacks и внутри callbacks делаем yield и finish.
func makeStream(from emitter: LegacyEmitter) -> AsyncStream<Int> {
AsyncStream { continuation in
emitter.onValue = { continuation.yield($0) }
emitter.onDone = { continuation.finish() }
emitter.start()
}
}
Вот здесь происходит очень важное «перекодирование мозга»:
Callback-модель обычно заставляет вас писать обработку прямо внутри onValue, то есть в producer-контексте. А AsyncStream позволяет сделать иначе: producer просто публикует значения, а consumer решает, что с ними делать, в обычном линейном коде for await.
Например, consumer теперь может выглядеть максимально читаемо:
func consumeLegacyAsStream() async {
let emitter = LegacyEmitter()
for await v in makeStream(from: emitter) {
print("value:", v) // value: 1, затем 2, затем 3
}
print("done")
}
И отдельно подчеркну нюанс про время жизни. В примере выше emitter создаётся вне makeStream и передаётся внутрь — так мы гарантируем, что источник событий живёт достаточно долго. Если создать LegacyEmitter() прямо внутри AsyncStream { ... } и нигде не хранить, можно случайно получить ситуацию «источник умер, а consumer ждёт». В реальном коде это один из самых неприятных багов: он выглядит как “иногда стрим работает, иногда молчит”, а причина — банально объект деинициализировался слишком рано.
5. Пример для LibraryCLI: поток логов по мере готовности
Чтобы AsyncStream не остался «примером в вакууме», давайте встроим его в наш CLI-проект в духе курса. Мы не будем придумывать сложную бизнес-логику (у нас для этого есть сервисы/репозитории), а сделаем маленькую демонстрационную команду, которая показывает поток сообщений: «обрабатываю книгу №…». Это похоже на будущий прогресс, но пока оставим события простыми строками.
Сначала добавим функцию, которая создаёт поток строк. Она имитирует работу: спит чуть-чуть и публикует очередную строку.
import Foundation
func makeImportLogStream(total: Int) -> AsyncStream<String> {
AsyncStream { continuation in
Task {
for i in 1...total {
try await Task.sleep(nanoseconds: 150_000_000)
continuation.yield("Imported book \(i)/\(total)")
}
continuation.finish()
}
}
}
Теперь представим, что у нас уже есть разбор команды (парсер) и обработчик команд (application service), как мы строили это раньше в курсе. В обработчике команды мы можем просто «прочитать» этот поток и печатать строки. Это получается удивительно линейно: никаких callbacks, никаких «держи счётчик и не забудь закрыть», просто цикл.
func runImportDemo(total: Int) async {
for await line in makeImportLogStream(total: total) {
print(line)
}
print("Import completed")
}
Если вы сейчас мысленно сравните это с callback-подходом, разница будет очень заметна. В callback-подходе вы бы передали onProgress, onDone, onError и начали бы городить «состояние процесса» снаружи. А тут у вас поток значений, и вы его читаете как обычную последовательность — просто с await.
И вот ещё один момент, который полезно проговорить: AsyncStream — это не обязательно «вещь уровня UI». Для CLI это тоже отлично подходит: вывод прогресса, постепенная обработка элементов, поток диагностических сообщений. Конкретно для LibraryCLI это становится особенно удобным, когда вы хотите печатать что-то “по мере того, как оно происходит”, но при этом сохранять архитектурное разделение “кто производит события” и “кто их выводит”.
6. Типичные ошибки при создании AsyncStream
Ошибка №1: забыть вызвать finish().
Это самый популярный «первый синяк» в теме. Consumer читает for await, получает несколько значений, а потом… тишина. Кажется, что цикл «завис», хотя на самом деле он честно ждёт следующий элемент. По контракту AsyncStream завершение итерации наступает только после finish() (после выдачи всех буферизированных элементов). Если боитесь забыть — используйте defer { continuation.finish() } в producer-Task.
Ошибка №2: делать долгую работу прямо внутри AsyncStream { ... }, без Task.
Замыкание конструктора стрима не async, и если вы начнёте там “долго считать”, вы получите странный UX: поток ещё не создан, consumer ещё не может начать чтение, а программа уже занята. В результате кажется, что стрим «не работает», хотя проблема в том, что вы построили его как синхронную блокировку.
Ошибка №3: терять время жизни источника событий.
Если вы адаптируете callback-объект и создаёте его внутри конструктора стрима, легко случайно потерять ссылку на него, и он будет деинициализирован раньше, чем успеет прислать события. Симптом обычно такой: “иногда приходит 1–2 события, иногда ноль”. Лечится просто: источник должен жить хотя бы столько, сколько вы хотите получать события, и это должно быть видно в коде (в идеале — хранить источник явно).
Ошибка №4: пытаться читать один и тот же AsyncStream несколькими независимыми итераторами “как массив”.
Интуиция новичка часто говорит: «Ну это же sequence, значит можно пройтись два раза». Но AsyncStream — это поток событий, и повторная итерация может дать неожиданные результаты. В proposal даже отдельно предупреждают, что многократная итерация или несколько итераторов могут привести к неожиданной последовательности значений. Практическое правило простое: относитесь к стриму как к “одному прослушиванию”, а не как к “коллекции для переиспользования”.
Ошибка №5: смешивать ответственность producer и consumer (особенно через print внутри producer).
Очень хочется писать print("yielded \(i)") рядом с yield(i), потому что “так проще отлаживать”. Для отладки — нормально, но как дизайн — плохая привычка. Producer должен публиковать значения, consumer — решать, что с ними делать. Иначе вы быстро получите код, где вывод размазан по разным слоям и его невозможно контролировать (например, отключить прогресс-лог, не ломая бизнес-логику).
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ