JavaRush /Курсы /ChatGPT Apps /Потоковые каналы: SSE и HTTP/stream — когда и как их испо...

Потоковые каналы: SSE и HTTP/stream — когда и как их использовать

ChatGPT Apps
13 уровень , 1 лекция
Открыта

1. Где вообще появляются «потоки» в архитектуре ChatGPT App

Прежде чем спорить, что лучше — SSE или HTTP-stream, полезно понять, где в нашем стеке вообще существуют потоки.

Условно можно выделить три уровня.

Во‑первых, уровень ChatGPT и модели. Модель сама по себе уже стримит ответ токенами: вы видите, как текст ответа «печатается» по буквам. Это тоже поток, но он полностью управляется OpenAI и к вашему коду напрямую не относится.

Во‑вторых, уровень MCP. Когда ChatGPT подключается к вашему MCP‑серверу, он, как правило, держит SSE‑соединение: сервер пушит в него JSON‑RPC‑сообщения MCP (ответы и нотификации), а ChatGPT в ответ шлёт запросы на отдельный HTTP‑endpoint, например /messages. В терминах MCP это базовый транспорт.

В‑третьих, уровень Apps SDK и вашего backend. Ваш React‑виджет GiftGenius запущен в песочнице ChatGPT и общается с вашим backend/MCP‑gateway по HTTP: через обычный fetch, через fetch c потоком (ReadableStream) или через SSE‑подписку (EventSource).

Важно не смешивать эти уровни в одну кашу. MCP‑события — это «провод» между ChatGPT и серверами; а SSE/HTTP-stream между виджетом и вашим HTTP‑backend — это уже ваш участок дороги.

Можно изобразить картинку.

flowchart TD
  subgraph ChatGPT
    UI[ChatGPT UI + модель]
    W[GiftGenius Widget]
  end

  subgraph YourInfra[Инфраструктура разработчика]
    GW[MCP Gateway / Backend]
    MCP[MCP Server]
  end

  UI -- "tool-call / ответы\n(внутренний стрим токенов)" --> W

  UI <-- "MCP over SSE\n(/sse + /messages)" --> MCP

  W <-- "HTTP / fetch / SSE / stream" --> GW

  GW <-- "JSON-RPC MCP" --> MCP

Сегодня мы сосредоточимся на стрелке Widget ↔ Backend и частично вспомним, что MCP‑транспорт сам по себе тоже основан на SSE.

Именно на этом участке — Widget ↔ Backend — нам и приходится выбирать, как общаться: простыми HTTP‑запросами или потоками. В следующем разделе посмотрим, почему «обычного» HTTP здесь быстро перестаёт хватать.

2. Почему обычного HTTP‑запроса недостаточно

Стандартная модель HTTP — это «запрос → один ответ». Клиент что‑то спросил, сервер один раз ответил, связь закрылась.

Для многих задач этого достаточно: получить текущий статус job’а, сохранить настройки пользователя, взять готовый список подарков, который уже лежит в базе.

Но как только вы делаете долгую операцию, всё начинает скрипеть.

Представьте GiftGenius, который:

  • собирает сигналы из нескольких источников (история покупок, wishlist, соцсети),
  • прогоняет это через пару LLM‑запросов,
  • строит персональный рейтинг из сотни кандидатов.

Всё это может занять десятки секунд. Если вы будете держать обычный HTTP‑запрос 40 секунд и молчать, UX будет как в старых браузерах: пользователь смотрит на крутящийся спиннер и гадает, умерло приложение или ещё «думает».

Кроме UX есть и чисто технические проблемы:

  • таймауты у ChatGPT, у Vercel, у прокси;
  • невозможность отправлять прогресс, partial results и т.п.;
  • невозможность корректно обработать обрыв связи и восстановиться.

Отсюда естественное решение: перейти от одного большого ответа к потоку маленьких кусочков, которые сервер может посылать по мере готовности.

Эти кусочки могут быть:

  • событиями (job.progress, job.completed) — это про SSE;
  • фрагментами одной большой полезной нагрузки (текст отчёта, NDJSON‑строки с подарками) — это про HTTP-stream.

3. SSE (Server‑Sent Events): подписка на события

Начнём с SSE, потому что он во многом «роднее» MCP: сам MCP поверх HTTP использует SSE‑соединение, чтобы пушить события от сервера к клиенту.

Модель SSE на пальцах

SSE — это протокол поверх обычного HTTP:

  1. клиент открывает GET‑запрос на endpoint, который отвечает с Content-Type: text/event-stream;
  2. сервер не закрывает соединение, а периодически пишет туда строки вида:
event: job.progress
data: {"jobId":"123","percent":40}

event: job.completed
data: {"jobId":"123","resultCount":12}
  1. браузерная сторона использует EventSource, который:
    • сам следит за перезапусками соединения;
    • парсит формат event: + data: + двойной перенос строки;
    • вызывает обработчики onmessage / addEventListener("job.progress", ...).

Ключевой момент: канал односторонний. Только сервер шлёт события клиенту. Клиент никаких данных через это соединение не отправляет.

Для ChatGPT Apps эта модель отлично подходит, когда виджет просто хочет «подписаться» на события по jobId и реагировать на прогресс и завершение задачи.

Мини‑пример SSE‑эндпойнта в Next.js 16

Пусть у нас есть route handler для событий прогресса Job’а:

app/api/gift-jobs/[jobId]/events/route.ts

import { NextRequest } from "next/server";

export async function GET(req: NextRequest, { params }: { params: { jobId: string } }) {
  const jobId = params.jobId;

  const stream = new ReadableStream({
    start(controller) {
      // Утилита для отправки SSE-события
      const send = (event: string, data: unknown) => {
        const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
        controller.enqueue(new TextEncoder().encode(payload));
      };

      send("job.started", { jobId });

      let percent = 0;
      const interval = setInterval(() => {
        percent += 20;
        if (percent >= 100) {
          send("job.completed", { jobId, totalGifts: 10 });
          clearInterval(interval);
          controller.close();
        } else {
          send("job.progress", { jobId, percent });
        }
      }, 1000);
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream", // вот тут мы задаем SSE
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

Это игрушечная имитация: раз в секунду растёт процент и в конце приходит job.completed. Позже вы замените этот таймер на реальные события воркера/очереди, но сама схема останется прежней.

Клиент: подписка на SSE в виджете GiftGenius

Внутри React‑виджета мы можем подписаться на этот поток при наличии jobId. Напомню, что API виджета работает в песочнице ChatGPT, но EventSource там доступен так же, как в обычном браузере.

import { useEffect, useState } from "react";

export function GiftJobProgress({ jobId }: { jobId: string }) {
  const [percent, setPercent] = useState(0);

  useEffect(() => {
    const url = `/api/gift-jobs/${jobId}/events`;
    const es = new EventSource(url);

    es.addEventListener("job.progress", (event) => {
      const data = JSON.parse((event as MessageEvent).data);
      setPercent(data.percent);
    });

    es.addEventListener("job.completed", () => {
      setPercent(100);
      es.close();
    });

    es.onerror = () => {
      // тут можно показать "Проблемы со связью, пробуем переподключиться"
    };

    return () => es.close();
  }, [jobId]);

  return <div>Прогресс подбора подарков: {percent}%</div>;
}

Теперь вы можете связать это с MCP‑инструментом. Инструмент start_gift_job возвращает jobId, а в ToolOutput вашего виджета вы просто рендерите GiftJobProgress.

Автопереподключение и Last‑Event‑ID

EventSource по стандарту пытается автоматически переподключаться, если соединение рвётся. Сервер может использовать стандартное поле SSE id: в событиях, а клиент — заголовок Last-Event-ID, чтобы после переподключения догнать пропущенные события.

Для простого GiftGenius можно пока не реализовывать ни id:, ни отдельный идентификатор событий и просто допускать небольшой «провал» прогресса при reconnect. Но в продакшене, особенно при высокой нагрузке, вам понадобится:

  • добавлять стандартное поле id: в каждое SSE‑событие, чтобы клиент мог передавать Last-Event-ID при переподключении;
  • вводить прикладной event_id в payload события и опираться на него при идемпотентной обработке на клиенте/бэкенде.

Это напрямую сочетается с идемпотентностью: даже если одно и то же job.progress придёт дважды, обработчик, увидев знакомый event_id, не будет повторно выполнять побочные эффекты.

В итоге SSE даёт нам удобную подписку на события вокруг jobId с автопереподключением и контролем дублей через идентификаторы событий. Теперь давайте разберёмся со вторым типом потоков — когда у нас один запрос, но очень большой ответ, который хочется отдавать частями.

4. HTTP‑streaming: постепенно отвечаем на один запрос

Если SSE — это «подписка на независимые события», то HTTP‑стриминг — это «один запрос, один ответ, но ответ растянут во времени и приходит чанками».

Это именно тот механизм, который вы видите, когда используете OpenAI API c stream : true: сервер шлёт JSON‑чанки (часто в SSE‑формате, но логика — «один запрос ↔ поток частичного ответа»), а клиент собирает их в итоговый текст.

В своих API вы можете сделать то же самое для:

  • больших текстовых отчётов (например, объяснение логики выбранных подарков),
  • длинных списков подарков (стримить их частями, а не держать пользователя в ожидании).

Простейший HTTP‑stream endpoint в Next.js

Допустим, нам нужно сгенерировать «объяснение» результата подбора, где LLM пишет длинный текст. Мы хотим стримить его в виджет по мере генерации.

app/api/gift-report/route.ts

import { NextRequest } from "next/server";

export async function POST(req: NextRequest) {
  const stream = new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder();

      controller.enqueue(encoder.encode("Начинаем анализ...\n"));

      // Здесь могла бы быть реальная генерация LLM с чанками
      for (const line of ["Собираем предпочтения...\n", "Считаем бюджет...\n", "Финальные рекомендации...\n"]) {
        await new Promise((r) => setTimeout(r, 1000));
        controller.enqueue(encoder.encode(line));
      }

      controller.close();
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/plain; charset=utf-8",
      "Transfer-Encoding": "chunked", // Тут мы задаем, что это HTTP/stream
    },
  });
}

Технически Next сам управляет chunked‑кодированием, вам важно только возвращать ReadableStream.

Чтение HTTP‑стрима в виджете через fetch

На клиенте (внутри виджета) можно прочитать поток так:

async function fetchReport(setText: (s: string) => void) {
  const res = await fetch("/api/gift-report", { method: "POST" });
  const reader = res.body!.getReader();
  const decoder = new TextDecoder();

  let acc = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    acc += decoder.decode(value, { stream: true });
    setText(acc); // обновляем UI по мере поступления
  }
}

И обёртка‑компонент:

import { useState } from "react";

export function GiftReport() {
  const [text, setText] = useState("");

  return (
    <div>
      <button onClick={() => fetchReport(setText)}>Сгенерировать отчёт</button>
      <pre style={{ whiteSpace: "pre-wrap" }}>{text}</pre>
    </div>
  );
}

Это классический паттерн: один запрос POST /api/gift-report, в ответ — поток текста, который вы постепенно отображаете.

Стримим JSON, а не текст

Зачастую вам захочется стримить не строки, а JSON‑объекты. Самый популярный формат — NDJSON (Newline‑delimited JSON): каждое событие — это одна JSON‑строка и завершается символом \n.

Пример серверной части:

const stream = new ReadableStream({
  async start(controller) {
    const encoder = new TextEncoder();
    for (let i = 0; i < 5; i++) {
      const chunk = { type: "gift", index: i, name: `Подарок #${i}` };
      controller.enqueue(encoder.encode(JSON.stringify(chunk) + "\n"));
      await new Promise((r) => setTimeout(r, 500));
    }
    controller.close();
  },
});

Клиент читает TextDecoder‑ом, разбивает по \n и парсит отдельные JSON‑объекты.

5. SSE vs HTTP‑stream: в чём разница и как выбирать

К этому моменту картина уже должна быть интуитивной, но давайте всё равно зафиксируем в виде небольшой таблицы.

Характеристика SSE (Server‑Sent Events) HTTP‑stream (chunked)
Инициатор Клиент делает GET и подписывается Клиент делает запрос (GET/POST), сервер стримит ответ
Направление Только сервер → клиент Ответ сервера на конкретный запрос
Семантика Подписка на поток событий (pub/sub) Частичный ответ на один запрос
Встроенный протокол Есть (event:, data:, id: и т.п.) Нет, вы придумываете формат сами (строки, NDJSON, JSON)
Клиентское API EventSource fetch + ReadableStream / response.body
Поддержка reconnect Встроенная (EventSource, Last-Event-ID) Надо реализовывать вручную
Типичные кейсы Прогресс, статусы, notifications по jobId Стриминг текста, больших JSON‑ответов, LLM‑вывода

Если упростить до состояния «правил на пальцах» (аккуратно, без фанатизма):

  • у вас есть job и куча событий вокруг него → SSE;
  • у вас один tool‑вызов возвращает большой результат, который хочется показывать по частям → HTTP‑stream.

Для GiftGenius это будет означать: SSE — для живого прогрессбара и статусов подбора; HTTP‑стрим — для длинной текстовой сводки или для постепенной загрузки длинного списка подарков.

6. Как это стыкуется с MCP и GiftGenius

Вспомним нашу схему из начала лекции: модель ↔ MCP ↔ виджет ↔ backend. Мы уже посмотрели на потоки на уровне виджет ↔ backend, а теперь вернёмся на шаг назад и аккуратно разведём, где именно в этой истории MCP, а где — «просто HTTP».

MCP определяет, как ChatGPT (как MCP‑клиент) общается с вашим MCP‑сервером. Для этого есть транспорт, в котором:

  • ChatGPT открывает SSE‑соединение /sse и получает по нему MCP‑сообщения (ответы, нотификации, события);
  • ChatGPT отправляет MCP‑запросы (call_tool, list_tools и т.д.) на /messages, обычно как POST с JSON‑RPC.

Этот уровень вы уже прошли, когда подключали GiftGenius к ChatGPT.

Теперь, когда мы добавляем асинхронные задачи и UX‑потоки в виджете, у нас появляются два варианта архитектуры.

Вариант первый — «чистый MCP»: MCP‑сервер сам генерирует события job.progress и job.completed; ChatGPT получает их по MCP‑SSE; затем модель сама вызывает ваш виджет с обновлённым контекстом, а виджет рендерит прогресс, не общаясь напрямую с backend. Это максимально «канонический» путь MCP‑events.

Вариант второй — гибридный: MCP‑tool start_gift_job создаёт задачу и возвращает jobId; виджет получает jobId и дальше сам общается с backend по HTTP, подписываясь на SSE‑эндпойнт /api/gift-jobs/{jobId}/events и, при необходимости, запрашивая HTTP‑стрим отчёта. С MCP‑стороны при этом ничего особенного не происходит.

В курсе мы идём по гибридному пути: он лучше встраивается в App Router/Next и проще для локальной отладки. А уже потом вы сможете переезжать на «чистые MCP‑нотификации», когда почувствуете вкус.

7. Reconnect, тайм‑ауты и прочие реальности сети

Пока всё звучало как идеал: открыли SSE или стрим, всё течёт, события прилетают, UX сияет. В реальной жизни сеть любит рвать соединения в неожиданные моменты, а инфраструктура — ставить таймауты.

Что может пойти не так

С SSE и HTTP-stream’ами вы рано или поздно встретитесь с:

  • idle‑таймаутами на прокси: «если по соединению ничего не шло N секунд — закрываем»;
  • перезапуском вашего backend (деплой, авария);
  • нестабильной сетью на стороне пользователя (особенно на мобильных).

Это нормально; важно быть к этому готовыми, а не надеяться «пронесёт».

Стратегия для SSE

У SSE много плюсов именно в этой зоне:

  • EventSource сам переподключается с некоторой задержкой;
  • у вас есть id: и Last-Event-ID, чтобы догнать события.

Минимальный набор практики:

  1. На сервере периодически слать что‑нибудь, наподобие heartbeat, чтобы соединение не считалось полностью idle. Это может быть отдельное событие event: ping или просто комментарий : keep-alive.
  2. На клиенте в onerror показывать пользователю понятный статус вида «Проблемы со связью, пытаемся переподключиться…», а не ломать весь виджет.
  3. При переподключении, если вы используете id:, отдавать с сервера только новые события после этого ID. Для GiftGenius можно начать вообще без id: и просто «перестраивать» состояние по последнему пришедшему job.progress/job.completed.

Стратегия для HTTP‑stream

HTTP‑стрим — это один запрос, поэтому при обрыве вам, по сути, приходится начинать заново:

  • если вы стримите текстовый отчёт, можно просто сказать пользователю «Не удалось получить полный отчёт, попробуйте ещё раз» и начать всё сначала;
  • если вы стримите структурированные данные (NDJSON), можно подумать о механизме resume: например, передавать в запросе offset или cursor, с которого нужно продолжить.

Для начала можно не усложнять и делать простую политику: если стрим ответа оборвался до конца — показываем то, что успели, и кнопку «Продолжить генерацию отчёта», которая отправит новый запрос.

Главное — не оставлять пользователя в состоянии «вечного ожидания».

8. Применяем к GiftGenius: сценарий от начала до конца

Теперь соберём вместе всё, что обсудили про SSE, HTTP‑stream и два варианта архитектуры с MCP, на живом сценарии GiftGenius — от запроса пользователя до готового отчёта.

Пользователь в ChatGPT пишет: «Подбери подарок для фаната настольных игр, бюджет до 100 долларов». Модель решает вызвать GiftGenius. Приложение/агент делает tool‑call start_gift_job на вашем MCP‑сервере. Сервер:

  • записывает job в БД;
  • отправляет её во внутреннюю очередь (подробности очередей и воркеров — следующая лекция, пока считаем, что «кто‑то» её выполняет);
  • синхронно возвращает jobId в ответ на tool‑call.

Виджет GiftGenius получает ToolOutput с jobId и рендерит компонент:

function GiftGeniusRoot({ jobId }: { jobId: string }) {
  return (
    <div>
      <h2>Ищем идеальные подарки...</h2>
      <GiftJobProgress jobId={jobId} />
      <GiftReport />
    </div>
  );
}

Компонент GiftJobProgress подписывается на SSE /api/gift-jobs/{jobId}/events и рисует прогресс. Каждый job.progress обновляет проценты, job.completed — ставит 100% и, возможно, включает кнопку «Показать подробный отчёт».

Компонент GiftReport по клику на кнопку отправляет POST /api/gift-report (передавая туда jobId) и постепенно отображает текстовый отчёт, пока сервер отдаёт чанки HTTP‑стрима.

При обрыве SSE‑соединения виджет показывает мягкое предупреждение, а EventSource пытается переподключиться. При проблемах со стримом отчёта пользователь видит часть отчёта и кнопку «Продолжить генерацию» или «Попробовать ещё раз».

С точки зрения ChatGPT и MCP:

  • MCP видит tool‑вызов start_gift_job и, возможно, потом нотификации о статусах job’а;
  • UX вокруг потоков реализован в основном на уровне HTTP между виджетом и вашим backend.

9. Типичные ошибки при работе с SSE и HTTP‑stream

Ошибка №1: считать SSE и HTTP‑stream «одной и той же штукой».
Да, где‑то внизу у них общий HTTP и chunked‑ответы, но семантика сильно разная. SSE — это подписка на независимые события, которые могут приходить когда угодно, и клиент об этом заранее не знает. HTTP‑стрим — это один конкретный ответ, размазанный по времени. Если попытаться реализовать подписку на множество jobId через один HTTP‑стрим, придётся самим изобретать протокол поверх байтов, фактически заново построив половину SSE.

Ошибка №2: игнорировать авто‑переподключение SSE и не думать про идемпотентность.
Многие пишут «простой» SSE‑сервер: шлют data: ... и не добавляют ни стандартный id: (для Last-Event-ID), ни прикладной event_id в теле события. Потом, при первом же обрыве соединения и переподключении, начинают плодиться дубли событий. Без продуманного event_id и логики «я уже видел это событие» обработчик на клиенте рискует дважды обновлять состояние, дважды показывать один и тот же job.completed или, хуже того, дважды списывать деньги/начислять бонусы.

Ошибка №3: отправлять каждое чихание воркера отдельным SSE‑событием.
Если шлёте прогресс задачи по SSE каждую миллисекунду, то скорее всего убьёте сеть и клиент, а не порадуете пользователя плавной анимацией. Гораздо разумнее агрегировать обновления и отправлять прогресс, скажем, раз в 200500 мс или при изменении стадии процесса. Сама тема троттлинга и backpressure будет ещё обсуждаться, но уже на этом этапе стоит думать про частоту событий.

Ошибка №4: делать сложные протоколы поверх HTTP‑стрима без явного формата.
Типичный анти‑паттерн: стримить JSON без разделителей и пытаться «угадать», где заканчивается один объект и начинается другой. Или смешивать в одном потоке текст и JSON. Лучший путь — выбрать простой и понятный формат: текст по строкам, или NDJSON (один JSON‑объект на строку), или явные разделители. Тогда парсер на клиенте останется вменяемым.

Ошибка №5: забывать про таймауты и «вечные» стримы.
Иногда разработчики делают SSE‑эндпойнты, которые ничего не отправляют по 510 минут, а потом удивляются, что соединения рвутся на пути от пользователя до сервера (балансировщики, API‑шлюзы, корпоративные прокси). Регулярные heartbeat‑события или комментарии позволяют держать соединение живым и вовремя обнаруживать обрывы. А HTTP‑стримы не должны превращаться в бесконечные ответы — для вечных подписок есть SSE.

Ошибка №6: пытаться сделать через HTTP‑стрим сложный pub/sub вместо нормальных событий.
Иногда возникает соблазн: «Сделаем один стрим, по нему будем слать и прогресс, и partial results, и случайные логи». В результате на клиенте появится сложный мультиплексор, который анализирует каждый чанк и решает, к какому jobId он относится. В большинстве случаев проще и надёжнее использовать SSE с событиями по типу job.progress, job.completed и отдельным каналом на job, чем изобретать самодельный мегапротокол поверх HTTP‑стрима.

Ошибка №7: жёстко завязывать UX на том, что поток «никогда не падает».
Любой поток когда‑нибудь оборвётся. Если ваш виджет при этом просто остаётся с вечно анимированным прогрессбаром и без вариантов действий — UX воспринимается как «сломанный». Даже простое сообщение «Похоже, связь прервалась. Попробуйте перезапустить подбор подарков» с кнопкой «Повторить» намного лучше, чем молчание.

1
Задача
ChatGPT Apps, 13 уровень, 1 лекция
Недоступна
HTTP-stream “Отчёт, который дописывается сам”
HTTP-stream “Отчёт, который дописывается сам”
1
Задача
ChatGPT Apps, 13 уровень, 1 лекция
Недоступна
SSE “Прогресс задачи” с keep-alive
SSE “Прогресс задачи” с keep-alive
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ