JavaRush /Курсы /Swift SELF /AsyncStream — создаём поток событий руками

AsyncStream — создаём поток событий руками

Swift SELF
71 уровень , 2 лекция
Открыта

1. Введение в AsyncStream

Если вы только начали привыкать к async/await, то соблазн такой: «Ну я же могу написать async-функцию и вернуть результат. Зачем ещё какая-то новая штука?». Проблема в том, что async-функция по своей природе заточена под один результат (или одну ошибку), а в реальной жизни часто нужны много результатов по мере готовности.

Представьте типичные ситуации в нашем CLI-проекте LibraryCLI:

Мы делаем команду, которая загружает много книг из сети или из файла. Мы не хотим молчать две минуты, а потом сказать «готово» (это UX уровня “микроволновка без таймера”). Нам хочется в процессе выводить: «загружено 1/50… 2/50… 3/50…». Или мы хотим отдавать найденные элементы по одному, чтобы пользователь мог начать работать с первыми результатами, не дожидаясь конца всей операции.

Старый мир (до async/await) обычно решал это callbacks’ами или делегатами: onItem, onProgress, onCompleted. И вот именно для такого «много значений во времени» стандартная библиотека Swift добавила AsyncStream — тип, который служит мостом между миром callbacks/делегатов и миром for await. В proposal по AsyncStream это прямо формулируется как цель: адаптировать APIs, которые выдают много значений «по событиям», к контракту AsyncSequence.

Важное ощущение, которое я хочу, чтобы у вас появилось: AsyncStream — это не магия и не «ещё один вид коллекций». Это просто удобный способ сказать: «Вот источник, который будет иногда публиковать значения, а вот потребитель, который будет их аккуратно читать через for await».

Producer и consumer: где живёт Continuation

Чтобы нормально пользоваться AsyncStream, полезно мысленно разделить систему на две роли. Первая роль — consumer (потребитель): это код, который читает значения через for await и делает полезную работу (печатает, обновляет состояние, копит статистику). Вторая роль — producer (производитель): это код, который эти значения порождает (читает файл, выполняет запросы, слушает события, считает прогресс).

В AsyncStream эти роли разведены на уровне типов. Consumer держит в руках сам AsyncStream<Element> (его можно перебирать). Producer держит в руках AsyncStream<Element>.Continuation (в него можно “писать”). В стандартном описании AsyncStream прямо подчёркивается: значения yield-ятся через continuation, а потребляются через интерфейс AsyncSequence.

Почти всегда эта схема выглядит так:

flowchart LR
    P[Producer<br/>генерирует события] -->|"yield(value)"| C[Continuation<br/>пишущая сторона]
    C --> S[AsyncStream<Element><br/>читающая сторона]
    S -->|for await| U[Consumer<br/>обрабатывает события]

Да, между continuation и consumer внутри есть буфер и разные нюансы поведения, но сегодня нам важно другое: вы создаёте поток, отдаёте наружу “читающую” сторону, а “пишущую” сторону используете внутри, чтобы публиковать элементы.

И ещё одно: AsyncStream исторически задуман именно как «корневая» асинхронная последовательность — источник значений. Это не трансформация, не map, не filter, а место, где поток начинается.

2. Базовая форма AsyncStream

Теперь соберём минимальную «болванку». Технически AsyncStream создаётся конструктором, куда вы передаёте замыкание. Это замыкание получает continuation. Всё, что вы сделаете с continuation — это и есть ваш producer-код.

В самом простом виде мы можем сделать поток из нескольких значений «прямо сейчас». Это не самый жизненный пример, но он даёт правильное ощущение: yield публикует элементы, finish() сообщает «всё, конец». В документации proposal подчёркивается, что finish() — обязательный сигнал завершения: без него consumer может ждать следующий элемент бесконечно.


import Foundation

func makeHelloStream() -> AsyncStream<String> {
    AsyncStream { continuation in
        continuation.yield("Hello")
        continuation.yield("Async")
        continuation.yield("Stream")
        continuation.finish()
    }
}

Здесь producer синхронный: он за один заход “выплюнул” три значения и завершил поток.

Consumer (мы это делали в прошлой лекции, но покажу коротко для связности) выглядит так:

func consumeHelloStream() async {
    for await word in makeHelloStream() {
        print(word)
    }
    print("done")
}

Обратите внимание на две вещи.

Первая: finish() — это не «опционально». Если вы забудете его, цикл for await просто не узнает, что «всё закончилось» (а значит может ждать вечно).

Вторая: замыкание создания стрима (AsyncStream { ... }) само по себе должно быть быстрым. Если вы внутри него начнёте делать долгую работу (читать гигабайтный файл, ходить в сеть), вы получите очень неприятную смесь: поток ещё не создан, а программа уже «зависла». Поэтому типовой приём — запускать долгую producer-работу в отдельной Task.

3. Producer внутри Task: значения приходят со временем

Сейчас сделаем тот вариант, ради которого AsyncStream чаще всего и берут: поток начинает жить, producer выдаёт значения постепенно, а consumer читает их по мере появления. Это похоже на конвейер: на входе кто-то кладёт коробки на ленту, на выходе кто-то эти коробки разбирает. Главное — не путать: лента сама по себе не ускоряет работу, она делает её более плавной и предсказуемой.

Начнём с простого: поток «тиков», который раз в 200 мс публикует число.

import Foundation

func makeTickStream(count: Int) -> AsyncStream<Int> {
    AsyncStream { continuation in
        Task {
            for i in 1...count {
                continuation.yield(i)
                try await Task.sleep(nanoseconds: 200_000_000)
            }
            continuation.finish()
        }
    }
}

Здесь есть три ключевые строки, которые стоит запомнить «на уровне мышечной памяти»:

Мы создаём AsyncStream { continuation in ... }, внутри сразу создаём Task { ... }, а дальше много раз вызываем continuation.yield(...), и в конце continuation.finish().

Почему Task здесь уместен? Потому что Task.sleep — это async-операция. А замыкание AsyncStream { ... } не async, оно не позволяет await. Поэтому мы и «уходим» в Task, где уже можно писать try await.

Если хотим сделать код чуть безопаснее (чтобы при любой причине выхода из producer-кода поток всё равно завершился), удобно поставить defer на finish(). Это не «обязательная магия», просто хороший стиль.

import Foundation

func makeSafeTickStream(count: Int) -> AsyncStream<Int> {
    AsyncStream { continuation in
        Task {
            defer { continuation.finish() }
            for i in 1...count { continuation.yield(i) }
        }
    }
}

Да, тут нет задержки — зато видно, как defer превращает завершение потока в «гарантированное». И это полезно, потому что забыть finish() — одна из самых частых ошибок при первом знакомстве. А стрим без finish() — как сериал без последней серии: все ждут, а автор ушёл за кофе.

4. Адаптация callback-API в AsyncStream

Теперь перейдём к самой “вкусной” причине, почему AsyncStream вообще существует: адаптация старых API, которые работают на callbacks. В proposal по AsyncStream подчёркивается, что это как раз “bridge” для случаев, когда значения приходят через callback/делегат много раз, а не возвращаются одной функцией.

Сделаем маленький учебный «легаси-источник», который вызывает onValue несколько раз, а потом onDone. Да, он будет на Task, но суть та же, что и у делегатов/нотификейшенов: события приходят сами, когда захотят.

import Foundation

final class LegacyEmitter {
    var onValue: ((Int) -> Void)?
    var onDone: (() -> Void)?

    func start() {
        Task {
            for i in 1...3 { onValue?(i) }
            onDone?()
        }
    }
}

Теперь пишем адаптер. Идея простая: в момент создания стрима мы «подписываемся» на callbacks и внутри callbacks делаем yield и finish.

func makeStream(from emitter: LegacyEmitter) -> AsyncStream<Int> {
    AsyncStream { continuation in
        emitter.onValue = { continuation.yield($0) }
        emitter.onDone = { continuation.finish() }
        emitter.start()
    }
}

Вот здесь происходит очень важное «перекодирование мозга»:

Callback-модель обычно заставляет вас писать обработку прямо внутри onValue, то есть в producer-контексте. А AsyncStream позволяет сделать иначе: producer просто публикует значения, а consumer решает, что с ними делать, в обычном линейном коде for await.

Например, consumer теперь может выглядеть максимально читаемо:

func consumeLegacyAsStream() async {
    let emitter = LegacyEmitter()
    for await v in makeStream(from: emitter) {
        print("value:", v) // value: 1, затем 2, затем 3
    }
    print("done")
}

И отдельно подчеркну нюанс про время жизни. В примере выше emitter создаётся вне makeStream и передаётся внутрь — так мы гарантируем, что источник событий живёт достаточно долго. Если создать LegacyEmitter() прямо внутри AsyncStream { ... } и нигде не хранить, можно случайно получить ситуацию «источник умер, а consumer ждёт». В реальном коде это один из самых неприятных багов: он выглядит как “иногда стрим работает, иногда молчит”, а причина — банально объект деинициализировался слишком рано.

5. Пример для LibraryCLI: поток логов по мере готовности

Чтобы AsyncStream не остался «примером в вакууме», давайте встроим его в наш CLI-проект в духе курса. Мы не будем придумывать сложную бизнес-логику (у нас для этого есть сервисы/репозитории), а сделаем маленькую демонстрационную команду, которая показывает поток сообщений: «обрабатываю книгу №…». Это похоже на будущий прогресс, но пока оставим события простыми строками.

Сначала добавим функцию, которая создаёт поток строк. Она имитирует работу: спит чуть-чуть и публикует очередную строку.

import Foundation

func makeImportLogStream(total: Int) -> AsyncStream<String> {
    AsyncStream { continuation in
        Task {
            for i in 1...total {
                try await Task.sleep(nanoseconds: 150_000_000)
                continuation.yield("Imported book \(i)/\(total)")
            }
            continuation.finish()
        }
    }
}

Теперь представим, что у нас уже есть разбор команды (парсер) и обработчик команд (application service), как мы строили это раньше в курсе. В обработчике команды мы можем просто «прочитать» этот поток и печатать строки. Это получается удивительно линейно: никаких callbacks, никаких «держи счётчик и не забудь закрыть», просто цикл.

func runImportDemo(total: Int) async {
    for await line in makeImportLogStream(total: total) {
        print(line)
    }
    print("Import completed")
}

Если вы сейчас мысленно сравните это с callback-подходом, разница будет очень заметна. В callback-подходе вы бы передали onProgress, onDone, onError и начали бы городить «состояние процесса» снаружи. А тут у вас поток значений, и вы его читаете как обычную последовательность — просто с await.

И вот ещё один момент, который полезно проговорить: AsyncStream — это не обязательно «вещь уровня UI». Для CLI это тоже отлично подходит: вывод прогресса, постепенная обработка элементов, поток диагностических сообщений. Конкретно для LibraryCLI это становится особенно удобным, когда вы хотите печатать что-то “по мере того, как оно происходит”, но при этом сохранять архитектурное разделение “кто производит события” и “кто их выводит”.

6. Типичные ошибки при создании AsyncStream

Ошибка №1: забыть вызвать finish().
Это самый популярный «первый синяк» в теме. Consumer читает for await, получает несколько значений, а потом… тишина. Кажется, что цикл «завис», хотя на самом деле он честно ждёт следующий элемент. По контракту AsyncStream завершение итерации наступает только после finish() (после выдачи всех буферизированных элементов). Если боитесь забыть — используйте defer { continuation.finish() } в producer-Task.

Ошибка №2: делать долгую работу прямо внутри AsyncStream { ... }, без Task.
Замыкание конструктора стрима не async, и если вы начнёте там “долго считать”, вы получите странный UX: поток ещё не создан, consumer ещё не может начать чтение, а программа уже занята. В результате кажется, что стрим «не работает», хотя проблема в том, что вы построили его как синхронную блокировку.

Ошибка №3: терять время жизни источника событий.
Если вы адаптируете callback-объект и создаёте его внутри конструктора стрима, легко случайно потерять ссылку на него, и он будет деинициализирован раньше, чем успеет прислать события. Симптом обычно такой: “иногда приходит 1–2 события, иногда ноль”. Лечится просто: источник должен жить хотя бы столько, сколько вы хотите получать события, и это должно быть видно в коде (в идеале — хранить источник явно).

Ошибка №4: пытаться читать один и тот же AsyncStream несколькими независимыми итераторами “как массив”.
Интуиция новичка часто говорит: «Ну это же sequence, значит можно пройтись два раза». Но AsyncStream — это поток событий, и повторная итерация может дать неожиданные результаты. В proposal даже отдельно предупреждают, что многократная итерация или несколько итераторов могут привести к неожиданной последовательности значений. Практическое правило простое: относитесь к стриму как к “одному прослушиванию”, а не как к “коллекции для переиспользования”.

Ошибка №5: смешивать ответственность producer и consumer (особенно через print внутри producer).
Очень хочется писать print("yielded \(i)") рядом с yield(i), потому что “так проще отлаживать”. Для отладки — нормально, но как дизайн — плохая привычка. Producer должен публиковать значения, consumer — решать, что с ними делать. Иначе вы быстро получите код, где вывод размазан по разным слоям и его невозможно контролировать (например, отключить прогресс-лог, не ломая бизнес-логику).

1
Задача
Swift SELF, 71 уровень, 2 лекция
Недоступна
Приветственный поток
Приветственный поток
1
Задача
Swift SELF, 71 уровень, 2 лекция
Недоступна
Тики с задержкой
Тики с задержкой
1
Задача
Swift SELF, 71 уровень, 2 лекция
Недоступна
Мост колбэков
Мост колбэков
1
Задача
Swift SELF, 71 уровень, 2 лекция
Недоступна
Прогресс обработки
Прогресс обработки
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ