1. Почему потоки особенно чувствительны к нагрузке
В прошлых частях мы уже разобрали, как устроены MCP‑события, статусы job.progress/job.completed, async‑job’ы и потоковые каналы (SSE/HTTP-stream) для GiftGenius. Теперь важно посмотреть, что с этой архитектурой будет под реальной нагрузкой.
Пока у вас один пользователь, который изредка запускает подбор подарка, всё выглядит прекрасно. Но как только GiftGenius попадает в продакшн и в него одновременно приходят сотни запросов на «подобрать подарки всем сотрудникам на корпоратив», вы внезапно обнаруживаете, что:
- на сервере сотни долгоживущих SSE‑соединений;
- воркеры бодро шлют job.progress на каждый чих;
- логи растут гигабайтами в день;
- UI у пользователя начинает подлагивать, хотя «сервер вроде не падает».
Классический HTTP‑запрос живёт миллисекды или секунды. Поток SSE или HTTP‑stream может жить минуты и даже часы. Он удерживает соединение, память, файловые дескрипторы. Каждый отправленный event — это сериализация JSON, копирование по сети, работа GC. Если на это смотреть как на «ну просто ещё один console.log на бэке», система очень быстро превратится в обогреватель.
У MCP‑events есть ещё одна особенность: они часто генерируются многократно для одной и той же задачи. Воркер, который обновляет прогресс каждые 0.1%, даёт впечатляющее число событий на одну job. В итоге вы получаете «шум»: огромное количество мелких сообщений, которые:
- грузят сеть и CPU;
- забивают очереди и буферы;
- делают отладку и анализ логов болезненными.
Поэтому и к потокам, и к MCP‑events нужно относиться так же серьёзно, как к запросам к базе или вызовам модели: это дорогой ресурс, который требует нормирования, контроля и мониторинга.
Чтобы с этим справиться, держим в голове три больших темы:
- Rate‑limits — ограничиваем, сколько и как часто мы вообще можем себе позволить генерировать и отправлять события/потоки.
- Backpressure — реагируем на ситуацию, когда потребитель не успевает за производителем.
- Мониторинг и метрики — измеряем, что происходит, и вовремя замечаем, что всё начало закипать.
2. Rate‑limiting потоков и событий
Начнём с самого понятного — лимитов.
Важно понимать, что в потоковых сценариях роль «опасного нарушителя» часто играет не клиент, а сервер. В обычных REST‑API вы ограничиваете количество запросов к серверу, чтобы пользователь не превратился в DDoS. В мире MCP и потоков очень легко устроить обратный DDoS: воркер или MCP‑сервер бомбит клиента тысячами событий в секунду.
Какие лимиты нужны
Обычно думают в трёх плоскостях.
Во‑первых, лимиты на пользователя или сессию. Нельзя позволить одному пользователю открыть двадцать параллельных мастер‑виджетов GiftGenius, каждый со своим SSE‑потоком. Разумное ограничение — несколько активных потоков на сессию и ограничение числа job’ов в статусе running для одного пользователя или арендатора.
Во‑вторых, лимиты на одну job. Здесь нас интересует частота событий. Вполне достаточно отправлять job.progress не чаще раза в N миллисекунд или только при заметном изменении, например каждые 5% прогресса. Не нужно слать сообщение на каждый обработанный товар в каталоге. Также имеет смысл ограничивать размер payload: событие прогресса не должно нести в себе мегабайты текста.
В‑третьих, лимиты на IP или организацию. Это уже защита от злоупотреблений, когда кто‑то запускает скрипт, который спамит задачами, или когда ваш App становится неожиданно популярным. Тут уже в игру вступают знакомые механизмы API‑gateway’ов и прокси.
Простая реализация лимита частоты событий
Рассмотрим воркер GiftGenius, который в фоне подбирает подарки по длинному списку получателей и периодически шлёт прогресс через MCP‑notification event/progress. Мы хотим, чтобы события отправлялись не чаще, чем раз в 500 миллисекунд и только при изменении процента хотя бы на 5 пунктов.
Условный псевдо‑код TS для воркера:
// допустим, есть некий mcpClient.sendNotification(...)
let lastSentPercent = 0;
let lastSentAt = 0;
function reportProgress(jobId: string, percent: number, message: string) {
const now = Date.now();
const percentDelta = percent - lastSentPercent;
const timeDelta = now - lastSentAt;
// отправляем только если прошло >= 500 мс ИЛИ выросло >= на 5%
if (percentDelta >= 5 || timeDelta >= 500) {
mcpClient.sendNotification("event/progress", {
jobId,
percent,
message,
});
lastSentPercent = percent;
lastSentAt = now;
}
}
Такой подход называется троттлингом: мы «прореживаем» поток событий по времени и по изменению значения.
Если вы выполняете разбивку на этапы («Этап 1 из 3», «Этап 2 из 3»), логика ещё проще: отправлять события только при смене этапа.
Лимит на количество одновременно открытых потоков
На стороне MCP‑сервера у вас, скорее всего, есть HTTP‑обработчик SSE:
// app/api/events/[userId]/route.ts (Next.js 16 App Router)
export async function GET(
req: Request,
{ params }: { params: { userId: string } },
) {
const userId = params.userId;
if (!canOpenMoreStreams(userId)) {
return new Response("Too many streams", { status: 429 });
}
const stream = new ReadableStream({
start(controller) {
registerSseClient(userId, controller);
},
cancel() {
unregisterSseClient(userId);
},
});
return new Response(stream, {
headers: { "Content-Type": "text/event-stream" },
});
}
Функция canOpenMoreStreams может проверять текущее число открытых соединений для пользователя и сравнивать с порогом (например, не более трёх параллельных потоков). Если лимит превышен, отдаём 429 и в GPT‑инструкциях объясняем модели, что в такой ситуации лучше не запускать ещё один длинный мастер, а подсказать пользователю, что «есть уже активный подбор, давайте его дождёмся».
В маленьких системах подобные проверки можно реализовать в памяти процесса. В более серьёзной инфраструктуре это уезжает в MCP‑gateway или отдельный rate‑limit‑сервис.
3. Backpressure: что делать, когда потребитель не успевает
Rate‑limits ограничивают, сколько мы хотим производить событий. Но даже при аккуратных лимитах всё равно возможна ситуация, когда потребитель «захлёбывается»: у пользователя плохой мобильный интернет, вкладка в браузере подвисла, ChatGPT в данный момент сильно загружен.
Backpressure — это реакция системы на то, что потребитель не успевает. Вместо того чтобы бесконечно накапливать данные и рано или поздно упасть с OOM, мы осознанно:
- замедляемся;
- агрегируем события;
- отбрасываем менее важные.
Где возникает давление
Типичный сценарий для GiftGenius может выглядеть так. Воркер пишет события в очередь (например, Redis Streams или просто таблицу в БД), MCP‑сервер считывает их и пушит в SSE‑канал. Если клиент медленный (3G, старый ноутбук, куча других вкладок), TCP‑буфер начинает заполняться, Node‑процесс не успевает полностью выкачать очередь и в итоге накапливает события в памяти. Дальше вы видите знакомое:
FATAL ERROR: Ineffective mark-compacts near heap limit
Backpressure на уровне сети (TCP) у вас уже есть, но он не знает про ваши доменные сущности. Он просто говорит: «Эй, притормози, буфер забит». Наша задача — проинтерпретировать это на уровне MCP‑событий.
Буферизация с лимитом и отбрасывание событий
Для прогресса и статусов у нас есть приятная особенность: не все события одинаково ценны. Пользователю важно знать последний актуальный процент, а не историю всех промежуточных «51%, 52%, 53%, 54%». Это значит, что мы можем смело дропать часть событий и отправлять только последнее.
Пусть у нас есть слой, который получает события прогресса от воркеров и кладёт их в буфер для каждого jobId:
type ProgressEvent = { jobId: string; percent: number; message: string };
const progressBuffers = new Map<string, ProgressEvent[]>();
const MAX_BUFFER = 10;
function bufferProgress(event: ProgressEvent) {
const buffer = progressBuffers.get(event.jobId) ?? [];
buffer.push(event);
// ограничиваем размер буфера
if (buffer.length > MAX_BUFFER) {
// оставляем только последние несколько событий
progressBuffers.set(event.jobId, buffer.slice(-MAX_BUFFER));
} else {
progressBuffers.set(event.jobId, buffer);
}
}
Отдельный таймер, например раз в 500 мс, смотрит на буфер и отправляет только последнее событие, игнорируя остальные:
setInterval(() => {
for (const [jobId, buffer] of progressBuffers.entries()) {
if (!buffer.length) continue;
const last = buffer[buffer.length - 1];
sendProgressToClient(last); // SSE/MCP notification
progressBuffers.set(jobId, []); // очищаем
}
}, 500);
Это пример тактики conflation: объединение нескольких апдейтов в один актуальный. Для прогресса — золотой паттерн.
Для событий типа «лог» или partial_result стратегия может быть другой. Там потеря событий часто недопустима: текст логов важен, а пропавший JSON‑чанк может сломать структуру данных. В этих случаях вы можете:
- агрегировать сообщение (склеить несколько строк логов в один пакет);
- или послать управляющий сигнал воркеру «притормози генерацию логов».
В асинхронных системах второй вариант сложнее, но о нём стоит хотя бы думать.
Ограничение глубины очередей
Backpressure не ограничивается буфером событий прямо перед отправкой. Нужно смотреть на все очереди в системе:
- очередь задач, ожидающих воркера;
- очередь событий между воркером и MCP‑сервером;
- буферы внутри стриминговых библиотек на стороне сервера.
Для каждой очереди важно назначить разумный лимит глубины. Если очередь переполняется, вы либо начинаете отвечать клиентам «система перегружена, попробуйте позже», либо отбрасываете менее важные job’ы, либо переводите часть сценариев в «офлайн‑режим» (например, формируете отчёт и отправляете ссылку на него позже).
Отдельный интересный приём — приоритизация типов событий. При перегрузке вы можете начать отправлять только job.completed и job.failed, а job.progress понизить в приоритете или выключить вовсе.
4. Мониторинг потоков и событий
Без измерений вся эта красота с rate‑limits и backpressure превращается в шаманство. Нужно видеть, что потоков стало подозрительно много, события идут с лагом, а клиенты пачками отваливаются.
Потоки ведут себя иначе, чем обычные HTTP‑запросы: их длительность может исчисляться минутами и часами, поэтому классические метрики «запросов в секунду» и «средняя латентность» не дают полной картины.
Ключевые метрики
Для потоков SSE или HTTP/stream полезно следить за несколькими группами показателей.
- Метрики соединений. Сколько сейчас активных потоков SSE? Как долго в среднем живёт одно соединение? Какой процент потоков завершается ошибкой или таймаутом? Резкий всплеск активных соединений говорит о потенциальном шторме трафика или утечке ресурсов (клиенты не закрывают соединения). Резкий спад — о массовом обрыве (например, проблемы в сети или критический баг на сервере).
- Метрики событий. Сколько событий вы отправляете в секунду по всем потокам (EPS — events per second, по сути число событий в секунду)? Каков средний размер события? Сколько ошибок десериализации или валидации payload вы наблюдаете? Если вы вдруг видите рост размера событий — возможно, кто‑то начал слать в job.progress вместо короткой строки весь текст отчёта.
- Метрики job’ов. Распределение по статусам (pending, running, completed, failed, canceled), среднее время выполнения по типу задачи, процент job’ов, уходящих в retry или в dead‑letter. Это помогает понять, что проблемы не только в сетевом уровне, но и в воркерах: внешнее API стало медленнее, появились массовые ошибки.
- Метрики backpressure и системные показатели. В потоковых системах часто смотрят на глубину буферов и очередей между компонентами, а также на процент времени, когда поток заблокирован, ожидая, что потребитель освободит место. Если ваши очереди почти всегда заполнены под завязку, это ясный сигнал, что система на пределе. Также важно следить за системными показателями: CPU и память на серверах, которые занимаются стримингом, и ошибки/таймауты на сетевом уровне. Иногда именно пропускная способность сети между MCP‑сервером и ChatGPT становится узким местом.
В сумме эти четыре группы дают вам ответ на три вопроса: сколько потоков сейчас живёт, сколько данных вы гоняете, как ведут себя job’ы и где именно система начинает захлёбываться.
Что логировать
Логи — второй столп наблюдаемости. Важно логировать события и соединения так, чтобы потом можно было собрать историю для конкретной job.
Обычно в логи для каждого события и потока добавляют:
- jobId и/или eventId;
- userId и sessionId (если есть мультиарендность);
- тип события (progress, completed, failed, resource.updated);
- тип канала (SSE или HTTP/stream);
- timestamp отправки и, по возможности, timestamp рождения события у воркера.
Так можно вычислить lag: разницу между временем, когда воркер сгенерировал событие, и временем, когда оно ушло в сокет. Рост этого lag‑времени — хороший индикатор проблем с backpressure.
Нужно быть аккуратным, чтобы логи сами не стали источником перегрузки. Для высокочастотных событий вроде job.progress логировать каждое событие не всегда разумно; можно включать sampling — логировать каждое N‑ое событие вместо всех подряд — или агрегировать статистику.
Кодово это может выглядеть как простой helper:
function logEvent(event: {
type: string;
jobId: string;
userId?: string;
channel: "sse" | "http-stream";
payload: unknown;
}) {
console.info({
...event,
timestamp: new Date().toISOString(),
});
}
В реальном проекте вы оборачиваете это в structured logging‑библиотеку, но идея та же: максимум полезного контекста в каждой записи.
5. Алерты и политики деградации
Когда у вас уже есть метрики и логи, следующий шаг — настроить алерты и продумать, как система должна «деградировать», когда ей плохо. Идея в том, что лучше честно работать хуже, чем внезапно упасть.
Примеры алертов
Для GiftGenius разумно следить за несколькими типичными ситуациями.
Во‑первых, аномальное количество активных потоков. Если обычно у вас десятки активных SSE‑соединений, а вдруг стало тысячи, стоит узнать, что происходит. Может, вы стали популярны, а может — у вас баг и соединения не закрываются.
Во‑вторых, задержка между фактическим завершением job’ы и получением job.completed клиентом. Если эта задержка начинает превышать порог (скажем, 5–10 секунд), значит где‑то между воркером и клиентом накапливаются события или пробуксовывают соединения.
В‑третьих, высокая доля job.failed или job.canceled по сравнению с успешными. Причина может быть как в воркере (сломанное внешнее API, новый баг), так и в повышенной чувствительности пользователя к задержкам (они начинают чаще отменять задачи).
Наконец, повышенный уровень ошибок подключения и разрывов стрима: если растёт число нештатных disconnect’ов, возможно, проблемы в сети или на стороне клиента, и стоит подумать о fallback‑сценариях.
Degradation‑паттерны
Когда система перегружена, можно включать «режим экономии ресурсов». Это лучше, чем просто начать отвечать 500 на всё.
Самый частый паттерн — адаптивная частота событий. Если вы видите, что event‑rate (число событий в секунду) взлетел в десять раз выше обычного и в очередях начинает расти лаг, уменьшайте частоту прогресс‑событий. Было каждые 1% — делайте каждые 10%. Было каждые 500 мс — делайте раз в 2–3 секунды. Пользователь прекрасно проживёт без сверхточного прогресса, а вот с полностью зависшим UI — не очень.
Для менее важных событий — например, resource.updated при бэкграунд‑обновлении фида товаров — можно временно отключать отправку вовсе, пока система под нагрузкой.
Ещё один приём — перевод части сценариев с потоков на периодический поллинг. Если SSE‑каналы обваливаются, MCP‑сервер может послать виджету системное событие вроде system.overloaded, а виджет — переключиться на стратегию «раз в N секунд опрашиваю REST‑endpoint про статус job’ы».
6. Небольшой практический фрагмент для GiftGenius
Чтобы связать всё воедино, давайте представим, что у нас уже есть:
- MCP‑tool startGiftSearch, который создаёт job и возвращает jobId;
- воркер, который выполняет поиск и шлёт event/progress и event/completed;
- SSE‑endpoint /api/events/[userId], к которому подключается виджет в Next.js.
Добавим простой уровень защиты от «шторма событий» и минимальный мониторинг.
Ограничение прогресса по шагам и времени
В воркере мы добавляем троттлинг и conflation, как обсуждали выше. Теперь события отправляются не чаще, чем раз в полсекунды и при изменении не менее 5%.
Учёт активных потоков
В SSE‑endpoint храним счётчик по пользователю:
const activeStreams = new Map<string, number>();
const STREAM_LIMIT = 3;
function canOpenMoreStreams(userId: string) {
const current = activeStreams.get(userId) ?? 0;
return current < STREAM_LIMIT;
}
function registerSseClient(userId: string, controller: ReadableStreamDefaultController) {
const current = activeStreams.get(userId) ?? 0;
activeStreams.set(userId, current + 1);
// здесь сохраняете controller в какую‑то структуру,
// чтобы позже писать в этот поток события
}
function unregisterSseClient(userId: string) {
const current = activeStreams.get(userId) ?? 1;
activeStreams.set(userId, Math.max(0, current - 1));
}
Метрики по activeStreams.size сервер может дополнительно отправлять в Prometheus/Grafana или любую другую систему мониторинга.
Простейшая метрика event‑rate
Для начала можно хоть как‑то посчитать, сколько событий мы шлём:
let eventsSentLastMinute = 0;
function sendProgressToClient(ev: ProgressEvent) {
// ... сериализация и запись в SSE‑поток
eventsSentLastMinute++;
}
setInterval(() => {
console.info({
metric: "events_per_minute",
value: eventsSentLastMinute,
timestamp: new Date().toISOString(),
});
eventsSentLastMinute = 0;
}, 60_000);
Со временем это можно заменить на нормальные счетчики и алерты, но как стартовая точка — уже неплохо.
Если собрать всё выше: лимиты, backpressure, метрики/алерты и адекватный UX‑fallback делают так, что ваш GiftGenius перестаёт быть «демкой для демо» и выдерживает реальные шторма трафика. В следующих модулях, где мы будем говорить про gateway, production‑архитектуру и полноценную наблюдаемость, эти паттерны нам ещё пригодятся.
7. Типичные ошибки при работе с потоками, rate‑limits и мониторингом
Ошибка №1: отсутствие лимитов на количество потоков и частоту событий.
Разработчики добавили SSE, «чтобы было красиво», воркеры честно шлют прогресс на каждый обработанный объект, и всё вроде работает на демо. Но при первом всплеске реальных пользователей сервер начинает тратить большую часть ресурсов на сериализацию и пересылку тысяч крошечных событий, а UI в ChatGPT превращается в слайд‑шоу.
Ошибка №2: попытка буферизовать «всё и сразу» без ограничений.
В коде появляется неограниченный массив «невышедших событий», который растёт до тех пор, пока клиент не восстановится. Спойлер: он не восстанавливается, сервер умирает раньше. Любой буфер должен иметь жёсткий максимум, а логика обработки переполнения — быть явной.
Ошибка №3: одинаковое отношение ко всем типам событий.
Прогресс можно агрегировать и дропать (последний процент важнее истории движения). С логами и partial‑результатами так делать нельзя — потеря одного чанка может означать повреждённые данные. Когда проектируете систему, заранее группируйте события по важности и придумывайте для каждой группы стратегию при перегрузке.
Ошибка №4: отсутствие наблюдаемости.
Никаких метрик по активным потокам, никакого учёта event‑rate, в логах — только «что‑то пошло не так». В такой ситуации вы узнаёте о проблемах только из отзывов пользователей и графика нагрузки на CPU. Настроить хотя бы базовые метрики и логи по jobId и eventId — не роскошь, а необходимость.
Ошибка №5: жёсткий UX, не учитывающий деградацию.
Виджет и GPT‑инструкции исходят из того, что поток всегда доступен, прогресс обновляется «в реальном времени», partial‑results приходят строго по сценарию. При первых же сетевых проблемах пользователь видит «зависший» прогресс‑бар и ни одного объяснения. Гораздо лучше заложить в UX честный fallback: «Сейчас есть проблемы с живым обновлением, я всё равно продолжу подбор и сообщу, когда закончу» — и перейти на более редкие обновления или поллинг.
Ошибка №6: доверие к тому, что «наши пользователи не создадут много одновременных задач».
Практика показывает, что если вы не ограничили число параллельных job’ов и потоков, кто‑нибудь обязательно откроет пять вкладок, запустит в каждой подбор подарков «по максимуму» и пойдёт пить кофе. Идея «авось пронесёт» в продакшене почти всегда заканчивается знакомством с мониторингом под грохот алертов.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ