JavaRush /Курси /ChatGPT Apps /Надійність потоків: rate-limits, backpressure і моніторин...

Надійність потоків: rate-limits, backpressure і моніторинг подій

ChatGPT Apps
Рівень 13 , Лекція 4
Відкрита

1. Чому потоки особливо чутливі до навантаження

У попередніх частинах ми вже розібралися, як влаштовані MCP‑події, статуси job.progress/job.completed, async‑jobʼи та потокові канали (SSE/HTTP-stream) для GiftGenius. Тепер важливо подивитися, що станеться з цією архітектурою під реальним навантаженням.

Поки у вас один користувач, який зрідка запускає підбір подарунка, усе виглядає чудово. Але щойно GiftGenius потрапляє в продакшн і до нього одночасно приходять сотні запитів на кшталт «підібрати подарунки всім співробітникам на корпоратив», ви раптом помічаєте, що:

  • на сервері — сотні довгоживучих SSE‑з’єднань;
  • воркери бадьоро шлють job.progress на кожну дрібницю;
  • логи щодня зростають на гігабайти;
  • UI у користувача починає пригальмовувати, хоча «сервер начебто не падає».

Звичайний HTTP‑запит живе мілісекунди або секунди. Натомість потік SSE чи HTTP‑stream може тривати хвилини й навіть години. Увесь цей час він утримує з’єднання, пам’ять і файлові дескриптори. Кожна надіслана подія — це серіалізація JSON, передавання мережею та робота GC. Якщо дивитися на це з думкою «та це ж просто ще один console.log на бекенді», система дуже швидко перетвориться на обігрівач.

У MCP‑подій є ще одна особливість: їх часто генерують багато разів для одного й того самого завдання. Воркер, який оновлює прогрес кожні 0,1 %, створює вражаючу кількість подій на одну job. У підсумку ви отримуєте «шум»: величезну кількість дрібних повідомлень, які:

  • навантажують мережу й CPU;
  • забивають черги та буфери;
  • роблять налагодження й аналіз логів болісними.

Тому і до потоків, і до MCP‑подій треба ставитися так само серйозно, як до запитів до бази чи викликів моделі. Це дорогий ресурс. Його потрібно нормувати, контролювати та моніторити.

Щоб із цим упоратися, тримайте в голові три великі теми:

  1. Rate‑limits — обмежуємо, скільки й як часто ми взагалі можемо собі дозволити генерувати та надсилати події/потоки.
  2. Backpressure — реагуємо на ситуацію, коли споживач не встигає за виробником.
  3. Моніторинг і метрики — вимірюємо, що відбувається, і вчасно помічаємо, що система починає «закипати».

2. Rate‑limiting потоків і подій

Почнімо з найочевиднішого — лімітів.

Важливо розуміти: у потокових сценаріях роль «небезпечного порушника» часто відіграє не клієнт, а сервер. У звичних REST‑API ви обмежуєте кількість запитів до сервера, щоб користувач не перетворився на DDoS. У світі MCP і потоків дуже легко отримати «зворотний DDoS»: воркер або MCP‑сервер засипає клієнта тисячами подій за секунду.

Які ліміти потрібні

Зазвичай про це думають у трьох площинах.

По‑перше, ліміти на користувача або сесію. Не можна дозволити одному користувачеві відкрити двадцять паралельних майстер‑віджетів GiftGenius, кожен зі своїм SSE‑потоком. Розумний варіант — кілька активних потоків на сесію та обмеження кількості jobʼів у статусі running для одного користувача або орендаря.

По‑друге, ліміти на одну job. Тут нас цікавить частота подій. Цілком достатньо надсилати job.progress не частіше разу на N мс або лише за помітної зміни — наприклад, кожні 5 % прогресу. Не потрібно слати повідомлення на кожен оброблений товар у каталозі. Також має сенс обмежувати розмір payload: подія прогресу не повинна тягнути за собою мегабайти тексту.

По‑третє, ліміти на IP або організацію. Це вже захист від зловживань: коли хтось запускає скрипт, який спамить завданнями, або коли ваш застосунок несподівано стає популярним. Тут у гру вступають знайомі механізми API‑gateway і проксі.

Проста реалізація ліміту частоти подій

Розгляньмо воркер GiftGenius, який у фоновому режимі підбирає подарунки за довгим списком отримувачів і періодично надсилає прогрес через MCP‑notification event/progress. Ми хочемо, щоб події надсилалися не частіше, ніж раз на 500 мс, і лише за зміни відсотка щонайменше на 5 пунктів.

Умовний псевдокод TS для воркера:

// Припустимо, є певний mcpClient.sendNotification(...)
let lastSentPercent = 0;
let lastSentAt = 0;

function reportProgress(jobId: string, percent: number, message: string) {
  const now = Date.now();
  const percentDelta = percent - lastSentPercent;
  const timeDelta = now - lastSentAt;

  // Надсилаємо, лише якщо минуло >= 500 мс АБО прогрес зріс щонайменше на 5%
  if (percentDelta >= 5 || timeDelta >= 500) {
    mcpClient.sendNotification("event/progress", {
      jobId,
      percent,
      message,
    });
    lastSentPercent = percent;
    lastSentAt = now;
  }
}

Такий підхід називається тротлінгом: ми «проріджуємо» потік подій у часі та за зміною значення.

Якщо ви розбиваєте процес на етапи («Етап 1 із 3», «Етап 2 із 3»), логіка ще простіша: надсилати події лише під час зміни етапу.

Ліміт на кількість одночасно відкритих потоків

На стороні MCP‑сервера у вас, найімовірніше, є HTTP‑обробник SSE:

// app/api/events/[userId]/route.ts (Next.js 16 App Router)
export async function GET(
  req: Request,
  { params }: { params: { userId: string } },
) {
  const userId = params.userId;

  if (!canOpenMoreStreams(userId)) {
    return new Response("Занадто багато потоків", { status: 429 });
  }

  const stream = new ReadableStream({
    start(controller) {
      registerSseClient(userId, controller);
    },
    cancel() {
      unregisterSseClient(userId);
    },
  });

  return new Response(stream, {
    headers: { "Content-Type": "text/event-stream" },
  });
}

Функція canOpenMoreStreams може перевіряти поточну кількість відкритих з’єднань для користувача й порівнювати її з порогом (наприклад, не більш ніж три паралельні потоки). Якщо ліміт перевищено, віддаємо 429 і в GPT‑інструкціях пояснюємо моделі, що в такій ситуації краще не запускати ще один довгий майстер. Натомість варто підказати користувачеві: «вже є активний підбір — дочекаймося його завершення».

У невеликих системах подібні перевірки можна реалізувати в пам’яті процесу. У серйознішій інфраструктурі це зазвичай переїжджає в MCP‑gateway або в окремий сервіс rate‑limit.

3. Backpressure: що робити, коли споживач не встигає

Rate‑limits обмежують, скільки подій ми хочемо виробляти. Але навіть за акуратних лімітів усе одно можлива ситуація, коли споживач «захлинається»: у користувача поганий мобільний інтернет, вкладка в браузері пригальмувала або ChatGPT у цей момент дуже завантажений.

Backpressure — це реакція системи на те, що споживач не встигає. Замість того щоб безкінечно накопичувати дані й рано чи пізно впасти з OOM, ми свідомо:

  • сповільнюємося;
  • агрегуємо події;
  • відкидаємо менш важливі.

Де виникає тиск

Типовий сценарій для GiftGenius може виглядати так. Воркер пише події в чергу (наприклад, Redis Streams або просто таблицю в БД). MCP‑сервер зчитує їх і надсилає в SSE‑канал. Якщо клієнт повільний (3G, старий ноутбук, купа інших вкладок), TCP‑буфер починає заповнюватися. Node‑процес не встигає повністю вичитати чергу й у підсумку накопичує події в пам’яті. Далі ви бачите знайоме:

FATAL ERROR: Ineffective mark-compacts near heap limit

Backpressure на рівні мережі (TCP) у вас уже є, але він не знає про ваші доменні сутності. Він просто каже: «Гей, пригальмуй — буфер заповнений». Наше завдання — інтерпретувати це на рівні MCP‑подій.

Буферизація з лімітом і відкидання подій

Для прогресу й статусів у нас є приємна особливість: не всі події однаково цінні. Користувачеві важливо знати останній актуальний відсоток, а не історію всіх проміжних «51 %, 52 %, 53 %, 54 %». Це означає, що ми можемо сміливо відкидати частину подій і надсилати лише останню.

Нехай у нас є шар, який отримує події прогресу від воркерів і кладе їх у буфер для кожного jobId:

type ProgressEvent = { jobId: string; percent: number; message: string };

const progressBuffers = new Map<string, ProgressEvent[]>();
const MAX_BUFFER = 10;

function bufferProgress(event: ProgressEvent) {
  const buffer = progressBuffers.get(event.jobId) ?? [];
  buffer.push(event);

  // Обмежуємо розмір буфера
  if (buffer.length > MAX_BUFFER) {
    // Залишаємо лише останні кілька подій
    progressBuffers.set(event.jobId, buffer.slice(-MAX_BUFFER));
  } else {
    progressBuffers.set(event.jobId, buffer);
  }
}

Окремий таймер, наприклад раз на 500 мс, дивиться на буфер і надсилає лише останню подію, ігноруючи інші:

setInterval(() => {
  for (const [jobId, buffer] of progressBuffers.entries()) {
    if (!buffer.length) continue;

    const last = buffer[buffer.length - 1];
    sendProgressToClient(last); // SSE/MCP notification

    progressBuffers.set(jobId, []); // Очищаємо
  }
}, 500);

Це приклад тактики conflation: об’єднання кількох оновлень в одне актуальне. Для прогресу це «золотий» патерн.

Для подій типу «лог» або partial_result стратегія може бути іншою. Там втрата подій часто неприпустима: текст логів важливий, а зниклий JSON‑чанк може зламати структуру даних. У таких випадках ви можете:

  • агрегувати повідомлення (склеїти кілька рядків логів в один пакет);
  • або надіслати керувальний сигнал воркеру «пригальмуй генерацію логів».

В асинхронних системах другий варіант складніший, але про нього варто хоча б пам’ятати.

Обмеження глибини черг

Backpressure не обмежується буфером подій прямо перед відправленням. Потрібно дивитися на всі черги в системі:

  • чергу завдань, що очікують воркера;
  • чергу подій між воркером і MCP‑сервером;
  • буфери всередині стримінгових бібліотек на стороні сервера.

Для кожної черги важливо призначити розумний ліміт глибини. Якщо черга переповнюється, ви або починаєте відповідати клієнтам «система перевантажена, спробуйте пізніше», або відкидаєте менш важливі jobʼи, або переводите частину сценаріїв в «офлайн‑режим» (наприклад, формуєте звіт і надсилаєте посилання на нього пізніше).

Окремий цікавий прийом — пріоритизація типів подій. За перевантаження ви можете почати надсилати тільки job.completed і job.failed, а job.progress знизити в пріоритеті або вимкнути зовсім.

4. Моніторинг потоків і подій

Без вимірювань уся ця краса з rate‑limits і backpressure перетворюється на шаманство. Потрібно бачити, що потоків стало підозріло багато, події йдуть із затримкою, а клієнти масово відвалюються.

Потоки поводяться інакше, ніж звичайні HTTP‑запити: їхня тривалість може обчислюватися хвилинами й годинами. Тому класичні метрики «запитів за секунду» і «середня латентність» не дають повної картини.

Ключові метрики

Для потоків SSE або HTTP-stream корисно стежити за кількома групами показників.

  1. Метрики з’єднань. Скільки зараз активних потоків SSE? Як довго в середньому живе одне з’єднання? Який відсоток потоків завершується помилкою або таймаутом? Різкий сплеск активних з’єднань говорить про потенційний шторм трафіку або витік ресурсів (клієнти не закривають з’єднання). Різкий спад — про масовий обрив (наприклад, через проблеми в мережі або критичний баг на сервері).
  2. Метрики подій. Скільки подій ви надсилаєте за секунду по всіх потоках (EPS — events per second, тобто кількість подій за секунду)? Який середній розмір події? Скільки помилок десеріалізації або валідації payload ви спостерігаєте? Якщо ви раптом бачите зростання розміру подій, можливо, хтось почав слати в job.progress замість короткого рядка весь текст звіту.
  3. Метрики jobʼів. Розподіл за статусами (pending, running, completed, failed, canceled), середній час виконання за типом завдання, відсоток jobʼів, що йдуть у retry або в dead‑letter. Це допомагає зрозуміти, що проблеми не лише на мережевому рівні, а й у воркерах: зовнішнє API стало повільнішим або з’явилися масові помилки.
  4. Метрики backpressure і системні показники. У потокових системах часто дивляться на глибину буферів і черг між компонентами, а також на відсоток часу, коли потік заблокований і чекає, доки споживач звільнить місце. Якщо ваші черги майже завжди заповнені вщерть, це чіткий сигнал: система на межі. Також важливо стежити за системними показниками: CPU і пам’ять на серверах, які займаються стримінгом, та помилки/таймаути на мережевому рівні. Іноді саме пропускна здатність мережі між MCP‑сервером і ChatGPT стає вузьким місцем.

У сумі ці чотири групи дають вам відповідь на три питання: скільки потоків зараз живе, скільки даних ви передаєте, як поводяться jobʼи та де саме система починає захлинатися.

Що логувати

Логи — другий стовп спостережності. Важливо логувати події та з’єднання так, щоб потім можна було зібрати історію для конкретної job.

Зазвичай у логи для кожної події та потоку додають:

  • jobId та/або eventId;
  • userId і sessionId (якщо є мультиорендність);
  • тип події (progress, completed, failed, resource.updated);
  • тип каналу (SSE або HTTP-stream);
  • часову мітку відправлення і, за можливості, часову мітку народження події у воркера.

Так можна обчислити затримку (lag): різницю між часом, коли воркер згенерував подію, і часом, коли вона пішла в сокет. Зростання цієї затримки — добрий індикатор проблем із backpressure.

Потрібно бути обережними, щоб логи самі не стали джерелом перевантаження. Для високочастотних подій на кшталт job.progress логувати кожну подію не завжди розумно. Можна вмикати вибіркове логування (sampling) — записувати кожну N‑ту подію замість усіх підряд — або агрегувати статистику.

Кодовий приклад може виглядати як проста допоміжна функція:

function logEvent(event: {
  type: string;
  jobId: string;
  userId?: string;
  channel: "sse" | "http-stream";
  payload: unknown;
}) {
  console.info({
    ...event,
    timestamp: new Date().toISOString(),
  });
}

У реальному проєкті ви обгортаєте це в бібліотеку структурованого логування, але ідея та сама: максимум корисного контексту в кожному записі.

5. Алерти та політики деградації

Коли у вас уже є метрики й логи, наступний крок — налаштувати алерти та продумати, як система має «деградувати», коли їй погано. Ідея проста: краще чесно працювати гірше, ніж раптово впасти.

Приклади алертів

Для GiftGenius розумно стежити за кількома типовими ситуаціями.

По‑перше, аномальна кількість активних потоків. Якщо зазвичай у вас десятки активних SSE‑з’єднань, а раптом їх стало тисячі, варто з’ясувати, що відбувається. Можливо, ви стали популярні. А можливо — у вас баг, і з’єднання не закриваються.

По‑друге, затримка між фактичним завершенням job і отриманням job.completed клієнтом. Якщо ця затримка починає перевищувати поріг (скажімо, 510 секунд), значить десь між воркером і клієнтом накопичуються події або «пробуксовують» з’єднання.

По‑третє, висока частка job.failed або job.canceled порівняно з успішними. Причина може бути як у воркері (зламане зовнішнє API, новий баг), так і в підвищеній чутливості користувачів до затримок (вони починають частіше скасовувати завдання).

Нарешті, підвищений рівень помилок під’єднання й розривів потоку. Якщо зростає кількість нештатних розривів, можливо, проблеми в мережі або на стороні клієнта. У такій ситуації варто продумати резервні сценарії.

Degradation‑патерни

Коли система перевантажена, можна вмикати «режим заощадження ресурсів». Це краще, ніж просто почати відповідати 500 на все.

Найчастіший патерн — адаптивна частота подій. Якщо ви бачите, що event-rate (кількість подій за секунду) злетів удесятеро вище звичного і в чергах починає зростати затримка, зменшуйте частоту подій прогресу. Було кожні 1 % — зробіть кожні 10 %. Було кожні 500 мс — зробіть раз на 23 секунди. Користувач чудово проживе без надточного прогресу, а от із повністю завислим UI — навряд.

Для менш важливих подій — наприклад, resource.updated під час бекграунд‑оновлення фіда товарів — можна тимчасово вимкнути відправлення зовсім, поки система під навантаженням.

Ще один прийом — переведення частини сценаріїв із потоків на періодичне опитування. Якщо SSE‑канали валяться, MCP‑сервер може надіслати віджету системну подію на кшталт system.overloaded, а віджет — перемкнутися на стратегію «раз на N секунд опитую REST‑endpoint про статус job».

6. Невеликий практичний фрагмент для GiftGenius

Щоб пов’язати все докупи, уявімо, що в нас уже є:

  • MCP‑tool startGiftSearch, який створює job і повертає jobId;
  • воркер, який виконує пошук і надсилає event/progress та event/completed;
  • SSE‑endpoint /api/events/[userId], до якого підключається віджет у Next.js.

Додамо простий рівень захисту від «шторму подій» і мінімальний моніторинг.

Обмеження прогресу за кроками і часом

У воркері ми додаємо тротлінг і conflation, як обговорювали вище. Тепер події надсилаються не частіше, ніж раз на пів секунди, і за зміни не менш ніж на 5 %.

Облік активних потоків

У SSE‑endpoint зберігаємо лічильник за користувачем:

const activeStreams = new Map<string, number>();
const STREAM_LIMIT = 3;

function canOpenMoreStreams(userId: string) {
  const current = activeStreams.get(userId) ?? 0;
  return current < STREAM_LIMIT;
}

function registerSseClient(userId: string, controller: ReadableStreamDefaultController) {
  const current = activeStreams.get(userId) ?? 0;
  activeStreams.set(userId, current + 1);

  // Тут зберігаєте controller в якійсь структурі,
  // щоб пізніше писати в цей потік події
}

function unregisterSseClient(userId: string) {
  const current = activeStreams.get(userId) ?? 1;
  activeStreams.set(userId, Math.max(0, current - 1));
}

Метрики за activeStreams.size сервер може додатково надсилати в Prometheus/Grafana або будь‑яку іншу систему моніторингу.

Найпростіша метрика event‑rate

Для початку можна хоча б порахувати, скільки подій ми надсилаємо:

let eventsSentLastMinute = 0;

function sendProgressToClient(ev: ProgressEvent) {
  // ... серіалізація і запис у SSE‑потік
  eventsSentLastMinute++;
}

setInterval(() => {
  console.info({
    metric: "events_per_minute",
    value: eventsSentLastMinute,
    timestamp: new Date().toISOString(),
  });
  eventsSentLastMinute = 0;
}, 60_000);

З часом це можна замінити на нормальні лічильники й алерти, але як стартова точка — уже непогано.

Якщо зібрати все вище — ліміти, backpressure, метрики/алерти та адекватний UX‑fallback, — ваш GiftGenius перестає бути «демкою для показу» і витримує реальні шторми трафіку. У наступних модулях, де ми говоритимемо про gateway, production‑архітектуру та повноцінну спостережність, ці патерни нам ще знадобляться.

7. Типові помилки під час роботи з потоками, rate‑limits і моніторингом

Помилка № 1: відсутність лімітів на кількість потоків і частоту подій.
Розробники додали SSE, «щоб було красиво». Воркери чесно надсилають прогрес на кожен оброблений об’єкт — і на демо все наче працює. Але за першого сплеску реальних користувачів сервер починає витрачати більшу частину ресурсів на серіалізацію та пересилання тисяч крихітних подій, а UI у ChatGPT перетворюється на слайд‑шоу.

Помилка № 2: спроба буферизувати «все й одразу» без обмежень.
У коді з’являється необмежений масив «невідісланих подій», який зростає доти, доки клієнт не «оживе». Спойлер: зазвичай раніше «помирає» сервер. Будь‑який буфер повинен мати жорсткий максимум, а логіка обробки переповнення — бути явною.

Помилка № 3: однакове ставлення до всіх типів подій.
Прогрес можна агрегувати й відкидати (останній відсоток важливіший за історію змін). З логами і partial‑результатами так робити не можна: втрата одного чанка може означати пошкоджені дані. Коли проєктуєте систему, заздалегідь групуйте події за важливістю й продумуйте для кожної групи стратегію на випадок перевантаження.

Помилка № 4: відсутність спостережності.
Жодних метрик за активними потоками, жодного обліку event-rate, у логах — лише «щось пішло не так». У такій ситуації ви дізнаєтеся про проблеми лише з відгуків користувачів і графіка навантаження на CPU. Налаштувати хоча б базові метрики та логи за jobId і eventId — не розкіш, а необхідність.

Помилка № 5: жорсткий UX, який не враховує деградацію.
Віджет і GPT‑інструкції виходять із того, що потік завжди доступний, прогрес оновлюється «в реальному часі», partial‑results приходять строго за сценарієм. За перших же мережевих проблем користувач бачить «завислий» прогрес‑бар і жодного пояснення. Значно краще закласти в UX чесний fallback: «Зараз є проблеми з живим оновленням. Я все одно продовжу підбір і повідомлю, коли закінчу» — і перейти на рідші оновлення або опитування.

Помилка № 6: віра в те, що «наші користувачі не створять багато одночасних задач».
Практика показує: якщо ви не обмежили кількість паралельних jobʼів і потоків, хтось обов’язково відкриє п’ять вкладок, запустить у кожній підбір подарунків «по максимуму» й піде пити каву. Ідея «авось пронесе» в продакшні майже завжди закінчується знайомством із моніторингом під гуркіт алертів.

1
Опитування
Сповіщення, рівень 13, лекція 4
Недоступний
Сповіщення
Сповіщення та потокові сценарії (події MCP)
Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ