JavaRush /Курсы /Swift SELF /Буферизация AsyncStream: bufferingNewest

Буферизация AsyncStream: bufferingNewest

Swift SELF
71 уровень , 3 лекция
Открыта

1. Проблема буфера: producer быстрее consumer

Когда вы впервые пишете AsyncStream, он кажется почти магией: вы «йелдите» значения, а где-то в другой части кода они красиво приходят в for await. Но как только вы добавляете реальный тайминг (сеть, дисковые операции, печать в консоль), выясняется неприятная вещь: producer и consumer живут в разных скоростях. Producer может «насыпать» событий быстро, а consumer — медленно их обработать.

Представьте, что producer — это курьер, который приносит уведомления, а consumer — это вы, которые эти уведомления читаете. Если курьер приносит по уведомлению каждую миллисекунду, а вы читаете по одному разу в секунду, то где-то должна образоваться куча непрочитанных бумажек. Эта куча и есть буфер. В AsyncStream буфер хранит значения, которые уже опубликованы, но ещё не потреблены итерацией: «буферизация только для значений, которые ещё не потреблены» — это важная часть модели.

Если буфер не ограничивать, вы можете получить два эффекта одновременно: приложение «вроде работает», но память растёт, как список задач у студента перед сессией. Поэтому у AsyncStream есть политика буферизации: сколько элементов можно держать и что делать при переполнении.

Чтобы увидеть проблему на пальцах, давайте сделаем почти нарочито «плохую» ситуацию: producer очень быстрый, consumer очень медленный (например, печатает в консоль с задержкой).


import Foundation

func fastInts() -> AsyncStream<Int> {
    AsyncStream(bufferingPolicy: .unbounded) { c in
        Task {
            for i in 1...10_000 { c.yield(i) }
            c.finish()
        }
    }
}

Код корректный, но если consumer делает тяжёлую обработку на каждый элемент, поток может накопить кучу элементов. И это не баг AsyncStream: это честная цена за «принял событие, но ещё не обработал».

2. Политики буферизации AsyncStream

Теперь, когда у нас появилась «куча бумажек», хочется договориться: сколько бумажек мы готовы держать на столе, и какие выбрасывать, если стол закончился. Именно это и описывает AsyncStream.Continuation.BufferingPolicy. У этой политики есть три основных варианта: .unbounded, .bufferingOldest(n), .bufferingNewest(n). По смыслу это режимы переполнения буфера.

.unbounded означает «буфер без ограничений». Это просто, удобно для учебных примеров и редких событий, но опасно для частых событий: если consumer отстаёт, память растёт.

.bufferingOldest(n) означает «когда буфер заполнен, новые элементы выбрасываются». То есть мы гарантируем, что сохраняем уже накопленное (самое старое в очереди) до размера n. Это полезно, когда важна «история», и вы скорее согласны потерять новые события, чем выбросить то, что уже накопили.

.bufferingNewest(n) означает «когда буфер заполнен, выбрасываем самый старый элемент из буфера и кладём новый». То есть мы гарантируем, что в буфере остаются самые свежие n событий. Это часто идеально для «состояния» и «прогресса», когда вам важнее текущее положение дел, чем каждый микрошаг.

Соберём это в небольшую таблицу, чтобы мозгу было легче:

Политика Что хранит буфер при переполнении Что теряем Типичный смысл
.unbounded
Всё (без лимита) Ничего, но рискуем памятью Редкие события, учебные примеры
.bufferingOldest(n)
Старые элементы (до n) Новые события «История важнее свежести»
.bufferingNewest(n)
Самые новые элементы (до n) Старые события «Важно текущее состояние»

Ещё один нюанс: если указать 0, поток фактически становится «дропающим»: значение будет отброшено, если в этот момент никто не ждёт next() (грубо: «нет спроса — нет поставки»). Как технический инструмент это иногда полезно.

3. bufferingNewest и bufferingOldest: как выбирать

На практике выбор политики — это не «какая быстрее», а «какая семантически честнее». Это как решить, что показывать пользователю: все промежуточные проценты загрузки или только последнее состояние. Помогает простой вопрос: событие — это “факт” или “снимок состояния”?

Если событие — “факт”, его потеря меняет смысл. Например: «книга добавлена в библиотеку», «операция удаления выполнена», «получен ответ сервера с данными». Такие события обычно нельзя терять, иначе вы пропустите часть логики. Для них либо нужен достаточно большой буфер, либо вообще другой дизайн потока (в рамках сегодняшней лекции мы не уходим в продвинутые стратегии согласования скоростей, просто фиксируем, что для фактов потеря болезненна). В таких случаях .bufferingOldest может показаться «безопаснее», потому что не выбрасывает уже накопленное, но он будет выбрасывать новые факты, и это тоже может быть недопустимо. Так что чаще для фактов либо увеличивают лимит, либо делают consumer быстрее, либо уменьшают частоту событий.

Если событие — “снимок состояния”, потеря части снимков обычно не страшна. Прогресс 3/100, затем 4/100, затем 5/100 — это типичный «снимок». Если пользователь увидит сразу 50/100, а промежуточные шаги пропадут — смысл не ломается: мы всё равно движемся вперёд. Именно поэтому прогресс чаще всего дружит с .bufferingNewest(1) или .bufferingNewest(2).

Чтобы закрепить разницу, представим буфер как маленькую очередь из трёх мест:

flowchart LR
    P[producer yield] --> B[(буфер на 3 элемента)]
    B --> C[consumer for await]

    subgraph Overflow["переполнение"]
      O1["bufferingOldest(3):
новое событие выбрасываем"] O2["bufferingNewest(3):
старое событие вытесняем"] end

В bufferingNewest вы словно говорите: «мне не нужна вся очередь сообщений, мне нужна последняя актуальная информация». В bufferingOldest вы говорите: «раз я уже начал собирать историю, не трогайте её, лучше не принимайте новые».

4. yield(...) и YieldResult: что произошло с событием

Частая ошибка новичка в асинхронности — думать, что yield «гарантирует доставку». На самом деле yield гарантирует другое: «я попытался отдать значение в поток, вот результат попытки». Поэтому yield возвращает YieldResult, который может сообщить, что значение попало в буфер (или сразу разбудило ожидающий next()), либо было отброшено из‑за заполненного буфера, либо поток уже в терминальном состоянии.

У AsyncStream.Continuation.YieldResult есть три кейса:

  • .enqueued(remaining: Int) — значение принято, а remaining показывает, сколько слотов в буфере оставалось на момент yield (полезно для диагностики; не стоит строить на этом сложную логику, если вы не уверены в эксклюзивности вызовов).
  • .dropped(Element) — значение не попало в буфер из‑за переполнения, и вам вернули «кого именно» выбросили (это информативно и при .bufferingOldest, и при .bufferingNewest).
  • .terminated — поток уже завершён (через finish() или из‑за отмены), и принимать новые значения смысла нет.

Мини‑пример, который печатает «что случилось» с событием:

import Foundation

func debugYield(_ value: Int, into c: AsyncStream<Int>.Continuation) {
    switch c.yield(value) {
    case .enqueued(let remaining):
        print("enqueued, remaining:", remaining) // например: enqueued, remaining: 7
    case .dropped(let dropped):
        print("dropped:", dropped)               // dropped: 42
    case .terminated:
        print("terminated")                      // terminated
    }
}

Важно: yield возвращается сразу и не «ждёт», пока consumer обработает элемент. Это часть контракта AsyncStream: он адаптирует системы без backpressure (например, callback’и), поэтому «поставщик» не обязан синхронно подстраиваться под «потребителя». Буфер и политика выбрасывания — это компромисс между мирами «мне сообщили событие» и «я успел обработать событие».

5. Практика: прогресс как поток и bufferingNewest(1)

Сейчас вернёмся к нашему учебному CLI‑приложению LibraryCLI. Прогресс — классическое место, где producer «сыпет» часто, а consumer (печать в консоль) работает медленно, потому что stdout — штука не бесконечно быстрая.

Заведём событие прогресса, максимально простое и типизированное:

enum ProgressEvent {
    case started(total: Int)
    case advanced(done: Int, total: Int)
    case completed
}

Теперь сделаем поток прогресса, который публикует события часто, например на каждом шаге. И здесь мы сознательно выберем .bufferingNewest(1): нам важно показывать «последний прогресс», а не каждую миллисекунду печатать 1/100, 2/100, 3/100… пока пользователь уже устал жить.

import Foundation

func makeProgressStream(total: Int) -> AsyncStream<ProgressEvent> {
    AsyncStream(bufferingPolicy: .bufferingNewest(1)) { c in
        Task {
            c.yield(.started(total: total))
            for done in 1...total { c.yield(.advanced(done: done, total: total)) }
            c.yield(.completed)
            c.finish()
        }
    }
}

Смысл .bufferingNewest(1) здесь скорее UX‑ный: если consumer (печать) не успевает, мы не хотим накапливать историю, мы хотим всегда иметь самое свежее значение.

Consumer, который «рендерит» событие в строку:

func render(_ e: ProgressEvent) -> String {
    switch e {
    case .started(let total): return "Start: 0/\(total)"
    case .advanced(let done, let total): return "Progress: \(done)/\(total)"
    case .completed: return "Done"
    }
}

И потребление через for await:

import Foundation

func showProgress() async {
    for await event in makeProgressStream(total: 5) {
        print(render(event))
    }
    print("Report: completed") // Report: completed
}

Эти куски специально маленькие: цель — почувствовать механизм буфера, а не утонуть в инфраструктуре CLI.

6. Полезные нюансы: unbounded и диагностика потерь

Чем опасен .unbounded для частых событий

Очень легко начать с .unbounded, потому что «ну пусть всё хранится, память же большая». А потом вы случайно делаете поток событий, который публикует быстрее, чем вы печатаете или обрабатываете, и приложение начинает вести себя странно: вывод отстаёт на секунды, потом на минуты, а потом вы печатаете прогресс 1% тогда, когда операция уже завершилась. При этом память может постепенно увеличиваться.

AsyncStream при такой настройке действительно буферизует элементы до тех пор, пока consumer их не заберёт — это ожидаемо для многих адаптируемых API (уведомления, записи БД и т.п.). Но «ожидаемо» не значит «всегда хорошо». Для прогресса и UI‑подобных индикаторов не нужно сохранять всё.

Если хотите увидеть эффект в миниатюре, можно искусственно замедлить consumer:

import Foundation

func slowConsumer() async {
    for await e in makeProgressStream(total: 50) {
        print(render(e))
        try? await Task.sleep(nanoseconds: 50_000_000) // 0.05s
    }
}

Если producer «насыпал» все 50 событий мгновенно, то consumer будет их печатать с задержкой. С .bufferingNewest(1) вы будете печатать более актуальные значения (и часть пропустите), а с .unbounded вы честно распечатаете всё — но с огромным лагом.

Логируем .dropped, но без паники

Когда вы включаете bufferingNewest или bufferingOldest, вы должны заранее принять, что некоторые события будут теряться. Это не трагедия, если события по смыслу «теряемые» (прогресс, обновление состояния). Но иногда полезно диагностировать: «мы вообще теряем события или consumer успевает?».

Здесь пригодится YieldResult. Добавим версию producer, которая считает, сколько раз что-то было отброшено.

import Foundation

func makeDebugProgress(total: Int) -> AsyncStream<ProgressEvent> {
    AsyncStream(bufferingPolicy: .bufferingNewest(1)) { c in
        Task {
            var droppedCount = 0
            for done in 1...total {
                if case .dropped = c.yield(.advanced(done: done, total: total)) {
                    droppedCount += 1
                }
            }
            print("dropped:", droppedCount) // dropped: ...
            c.finish()
        }
    }
}

И ещё один нюанс: continuation потокобезопасен, и вызовы yield/finish сериализуются, но если вы вызываете yield из нескольких конкурентных мест, порядок доставки может стать неожиданным. Мы в этой лекции не строим сложные конкурентные producer’ы, но как «запах проблемы» это стоит запомнить: один producer‑Task проще для понимания и дебага.

7. Типичные ошибки

Ошибка №1: выбирать политику «наугад», не определив смысл события.
Когда разработчик ставит .bufferingNewest(1) «потому что так короче», он может случайно потерять действительно важные элементы (например, события доменной логики). Политика буфера — это не оптимизация, а часть смысла потока: для прогресса и состояния подходит «держим последнее», а для фактов чаще нужен другой подход, где потеря недопустима.

Ошибка №2: ожидать, что yield гарантирует доставку каждого элемента.
yield может вернуть .dropped(...) или .terminated, и это нормальная часть API: у AsyncStream есть ограничение буфера и терминальное состояние. Если вы пишете код так, будто yield всегда «доставит», то при нагрузке ваш consumer начнёт видеть не то, что вы ожидали, и это будет выглядеть как «рандом».

Ошибка №3: оставлять .unbounded на потоке частых событий и удивляться памяти и лагам.
.unbounded удобен, пока событий мало, но как только producer становится быстрее consumer, вы получаете накопление. Это может проявляться не только в памяти, но и в UX: вывод начинает «догонять прошлое» и запаздывает относительно реальности. Для прогресса и статуса чаще всего разумнее ограничивать буфер.

Ошибка №4: путать bufferingOldest и bufferingNewest по смыслу.
Название легко перепутать, особенно в конце тяжёлого дня. Тут помогает самопроверка: .bufferingNewest(n) означает «в буфере остаются самые новые n», то есть старые выталкиваются; .bufferingOldest(n) означает «в буфере остаются самые старые n», то есть новые будут выброшены при переполнении.

Ошибка №5: продолжать yield после finish() и ожидать, что consumer это увидит.
После finish() поток в терминальном состоянии, и дальнейшие yield уже не имеют нормального смысла: вы можете увидеть .terminated. В хорошей архитектуре вы заранее решаете, кто завершает поток, и после завершения больше не «публикуете» события в него.

1
Задача
Swift SELF, 71 уровень, 3 лекция
Недоступна
Последний датчик
Последний датчик
1
Задача
Swift SELF, 71 уровень, 3 лекция
Недоступна
Счётчик потерь
Счётчик потерь
1
Задача
Swift SELF, 71 уровень, 3 лекция
Недоступна
Прогресс по одному
Прогресс по одному
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ