1. Зачем вообще асинхронные задачи в ChatGPT App
Если бы мир был идеален, любой ваш MCP tool успевал бы за пару сотен миллисекунд. Но в жизни всё интересное — долгое и тяжёлое:
- разбор большого CSV с историей покупок пользователя;
- агрегация данных из нескольких внешних API, каждый из которых то спит, то отвечает с 503;
- построение сложных рекомендаций с кучей промежуточных шагов;
- генерация больших отчётов и презентаций.
Если попытаться запихнуть это в один синхронный tool‑call, вы упрётесь в три проблемы.
Во‑первых, таймауты. ChatGPT‑сессия, HTTP‑инфраструктура, MCP‑клиент — всё это не рассчитано на ответы «через пять минут». Сервер, который держит соединение слишком долго, будет выглядеть как «подвисший» и для ChatGPT, и для пользователя.
Во‑вторых, управление нагрузкой. Если сто пользователей одновременно запустят «супер‑анализ подарков на Новый год», вы не хотите, чтобы MCP‑сервер синхронно держал сто долгих задач прямо в HTTP‑потоках. Вам нужна прослойка, которая умеет принимать всплески, раскладывать задачи по очереди и обрабатывать их несколькими воркерами.
В‑третьих, UX. Пользователь нажал кнопку в виджете GiftGenius и уставился в один спиннер на 40 секунд — ощущение примерно как от старых интернет‑банков. Нам намного приятнее модель «быстрый ответ + прогресс + возможность отмены».
Эти проблемы решаются общей схемой: «запуск → очередь → фон → события».
2. Базовая архитектура async‑job в контексте MCP
Возьмём наш GiftGenius. Допустим, у нас появился новый тяжёлый сценарий: «Глубокий анализ предпочтений по истории покупок и соцсетям друга». Такая штука может работать несколько минут, поэтому:
- Инструмент MCP (tool) принимает параметры запроса от модели.
- Вместо того, чтобы сразу всё считать, он создаёт запись Job в базе.
- Кладёт задачу в очередь.
- Мгновенно отвечает ChatGPT: «Анализ запущен, вот jobId».
- Фоновый воркер забирает задачу из очереди, выполняет тяжёлую работу, по ходу дела шлёт MCP‑события job.progress и job.partial, в конце — job.completed или job.failed.
С точки зрения архитектуры это выглядит примерно так:
flowchart LR
subgraph ChatGPT
U[Пользователь] --> GPT[Модель + ChatGPT UI]
end
GPT -->|call_tool analyze_preferences| MCP[MCP-сервер]
subgraph Backend
MCP -->|создать Job| DB[(БД задач)]
MCP -->|enqueue| Q[Очередь]
W[Воркер] -->|take job| Q
W -->|update status/progress| DB
W -->|MCP events: job.progress/job.completed| MCP
end
MCP -->|SSE events| GPT
Важная мысль: MCP‑сервер — это не обязательно монолит. Часто он выступает как фасад над вашей внутренней асинхронной инфраструктурой: он принимает tool‑calls, создаёт job’ы и шлёт события, а тяжёлую работу делают отдельные процессы‑воркеры.
3. Модель данных для асинхронной задачи
Начнём с простой модели Job. Мы будем использовать TypeScript и условный Node/MCP‑сервер, чтобы вы сразу видели, как это вписывается в ваш стек.
Простейшая модель в памяти/БД может выглядеть так:
// openai/jobs/model.ts
export type JobStatus =
| 'pending'
| 'in_progress'
| 'completed'
| 'failed'
| 'canceled';
export interface GiftJob {
id: string; // jobId
type: 'deep_gift_analysis';
status: JobStatus;
payload: {
recipientProfile: string; // текст/ID профиля
budget: number;
};
result?: unknown; // итоговые рекомендации
error?: string; // причина ошибки
attempts: number; // сколько раз пытались выполнить
createdAt: Date;
updatedAt: Date;
}
В реальном проекте вы будете хранить GiftJob в Postgres, DynamoDB, Firestore или ещё где‑нибудь, но для лекции нам важны поля:
- status — текущее состояние задачи, которое будет отражаться и в событиях, и в UX;
- attempts — счётчик для retry;
- error — для логов и отладки;
- payload — входные данные, которые воркер использует для обработки.
4. Инструмент MCP, который создаёт async‑job
Представим инструмент start_deep_analysis. Раньше он мог бы делать всё синхронно, а теперь только ставит задачу в очередь и возвращает jobId.
// openai/tools/startDeepAnalysis.ts
import { v4 as uuid } from 'uuid';
import { createJobAndEnqueue } from '../jobs/queue';
// Псевдотипы для MCP SDK
type StartDeepAnalysisInput = {
recipientProfile: string;
budget: number;
};
type StartDeepAnalysisOutput = {
jobId: string;
message: string;
};
export async function startDeepAnalysisTool(
input: StartDeepAnalysisInput
): Promise<StartDeepAnalysisOutput> {
const jobId = uuid();
await createJobAndEnqueue({
id: jobId,
type: 'deep_gift_analysis',
status: 'pending',
payload: {
recipientProfile: input.recipientProfile,
budget: input.budget,
},
attempts: 0,
createdAt: new Date(),
updatedAt: new Date(),
});
return {
jobId,
message: `Запустила глубокий анализ. ID задачи: ${jobId}. Я буду присылать обновления по мере готовности.`,
};
}
Здесь важно, что:
- MCP‑инструмент работает быстро: максимум пара запросов к БД/очереди;
- он возвращает структурированный ответ с jobId, который ChatGPT может использовать в своём «объяснении пользователю» и который виджет GiftGenius может сохранить в своём widgetState.
Ваша JSON Schema для этого инструмента просто описывает jobId как строку и message как человекочитаемый текст — модель поймёт, что это идентификатор задачи, и сможет ссылаться на него в следующих шагах диалога.
5. Простая очередь и воркер: учебная версия
Чтобы не тащить сейчас Redis, RabbitMQ и всё остальное, сделаем упрощённую ин‑мемори очередь. В реальном проде, конечно, это будет отдельный сервис (SQS/BullMQ/Cloud Tasks и т.п.), но логика останется такой же.
Сначала заготовка очереди:
// openai/jobs/queue.ts
import type { GiftJob } from './model';
const jobs = new Map<string, GiftJob>(); // "БД" в памяти
export const queue: string[] = []; // упрощённая очередь по id
export async function createJobAndEnqueue(job: GiftJob) {
jobs.set(job.id, job);
queue.push(job.id);
}
export function getJob(id: string): GiftJob | undefined {
return jobs.get(id);
}
export function updateJob(id: string, patch: Partial<GiftJob>) {
const job = jobs.get(id);
if (!job) return;
const updated: GiftJob = { ...job, ...patch, updatedAt: new Date() };
jobs.set(id, updated);
}
Теперь примитивный воркер, который периодически заглядывает в очередь, берёт job и обрабатывает её:
// openai/jobs/worker.ts
import { getJob, updateJob } from './queue';
import { emitJobEvent } from './events';
async function processJob(jobId: string) {
const job = getJob(jobId);
if (!job) return;
updateJob(jobId, { status: 'in_progress' });
await emitJobEvent(jobId, 'job.started', {});
try {
// Тут вызываем долгую бизнес-логику
const result = await doDeepGiftAnalysis(job.id, job.payload);
updateJob(jobId, { status: 'completed', result });
await emitJobEvent(jobId, 'job.completed', { resultSummary: summarize(result) });
} catch (err) {
updateJob(jobId, {
status: 'failed',
error: (err as Error).message,
});
await emitJobEvent(jobId, 'job.failed', { error: 'Internal error' });
}
}
И сам «циклический» воркер, который можно запустить где‑то при старте приложения:
// openai/jobs/workerLoop.ts
import { queue } from './queue';
import { processJob } from './worker';
export function startWorkerLoop() {
setInterval(async () => {
const jobId = queue.shift(); // по-хорошему нужна защита от гонок
if (!jobId) return;
await processJob(jobId);
}, 1000); // раз в секунду проверяем очередь
}
Это учебный пример. В реальном мире вместо setInterval будет нормальная очередь, которая сама «будит» воркера, когда появляется новое сообщение. Но общая идея хорошо видна: воркер отделён от MCP‑инструмента, работает фоном и общается с MCP‑сервером через события.
6. Генерация MCP‑событий из воркера
В предыдущих лекциях вы уже видели формат MCP‑событий: тип (type), уникальный event_id, timestamp, job_id и payload. Теперь покажем, как воркер может вызывать helper emitJobEvent, а тот — доставлять события до ChatGPT через SSE‑канал MCP‑сервера.
Пример простого хелпера:
// openai/jobs/events.ts
import { randomUUID } from 'crypto';
import { sendMcpEvent } from '../mcp/eventBus';
export async function emitJobEvent(
jobId: string,
type: 'job.started' | 'job.progress' | 'job.completed' | 'job.failed',
payload: unknown
) {
const event = {
event_id: randomUUID(),
type,
job_id: jobId,
timestamp: new Date().toISOString(),
payload,
};
await sendMcpEvent(event);
}
А sendMcpEvent внутри MCP‑сервера уже знает, как именно пихнуть это событие в SSEServerTransport из MCP SDK: например, через локальную шину событий или Redis Pub/Sub, как мы разбирали в модуле 12.
Ключевая мысль: воркер не общается с ChatGPT напрямую. Он общается с MCP‑сервером, а тот держит SSE‑соединения и пересылает события клиентам.
7. Прогресс и partial results из воркера
Теперь к самому вкусному: прогресс и частичные результаты. В GiftGenius долгий анализ можно разбить на этапы:
- сбор и нормализация данных;
- построение базовых сегментов;
- генерация первичных подарочных идей;
- финальный ранкинг и текстовое объяснение.
На каждом этапе мы можем отправлять job.progress и иногда — job.partial, чтобы UI уже показывал первые подарки.
Условный воркер:
async function doDeepGiftAnalysis(jobId: string, payload: GiftJob['payload']) {
await emitJobEvent(jobId, 'job.progress', { step: 1, totalSteps: 4 });
const normalized = await collectAndNormalizeData(payload);
await emitJobEvent(jobId, 'job.progress', { step: 2, totalSteps: 4 });
const roughGifts = await generateInitialGifts(normalized);
await emitJobEvent(jobId, 'job.partial', { gifts: roughGifts.slice(0, 3) });
await emitJobEvent(jobId, 'job.progress', { step: 3, totalSteps: 4 });
const finalGifts = await rerankAndBeautify(roughGifts);
await emitJobEvent(jobId, 'job.progress', { step: 4, totalSteps: 4 });
return finalGifts;
}
Виджет, слушая события, может сначала показать 3 «черновых» подарка с пометкой «Ещё уточняем», а после job.completed — обновить список и убрать индикатор загрузки. Всё это идеально ложится на UX‑паттерны, о которых мы говорили в лекции 3.
8. Retry‑логика для воркеров
Теперь самое нервное место: ошибки и повторы.
Представьте, что воркер при обработке задачи ходит во внешний API списка товаров, а тот периодически отвечает 500 или 429. Бросать задачу после первой же ошибки — странно. Но и перезапускать бесконечно нельзя: так вы устроите DDoS по самим себе или по стороннему сервису.
Нам нужна стратегия retry с экспоненциальной задержкой и ограничением числа попыток.
Начнём с классификации ошибок, которая будет нам нужна и дальше в курсе:
- временные (transient) — таймауты, 500, 503, 429;
- постоянные (permanent) — неверный вход, несуществующий ресурс;
- фатальные (bug) — баги кода, TypeError, неожиданное исключение.
Повторять имеет смысл только временные ошибки. Остальные нужно честно помечать как 'failed'.
Упростим и сделаем хелпер:
// openai/jobs/retry.ts
export function shouldRetry(error: unknown): boolean {
if (!(error instanceof Error)) return false;
// Условно: HTTP 5xx или 429
return /5\d\d|429/.test(error.message);
}
export function getDelayMs(base: number, attempt: number): number {
const jitter = Math.random() * 100; // небольшой шум
return base * 2 ** attempt + jitter; // экспоненциальный backoff
}
Теперь обновим воркер так, чтобы он учитывал attempts в GiftJob:
// openai/jobs/worker.ts
import { getJob, updateJob } from './queue';
import { emitJobEvent } from './events';
import { shouldRetry, getDelayMs } from './retry';
const MAX_ATTEMPTS = 5;
export async function processJob(jobId: string) {
const job = getJob(jobId);
if (!job) return;
updateJob(jobId, { status: 'in_progress' });
try {
const result = await doDeepGiftAnalysis(job.id, job.payload);
updateJob(jobId, { status: 'completed', result });
await emitJobEvent(jobId, 'job.completed', {
resultSummary: summarize(result),
});
} catch (err) {
const attempts = job.attempts + 1;
const error = err as Error;
if (attempts <= MAX_ATTEMPTS && shouldRetry(error)) {
const delay = getDelayMs(1000, attempts); // 1s,2s,4s...
updateJob(jobId, { attempts, status: 'pending', error: error.message });
setTimeout(() => {
// В реальной очереди вы бы "переenqueue'или" job с задержкой
processJob(jobId);
}, delay);
await emitJobEvent(jobId, 'job.progress', {
retry: attempts,
nextAttemptInMs: delay,
});
} else {
updateJob(jobId, { status: 'failed', error: error.message });
await emitJobEvent(jobId, 'job.failed', {
error: 'Не удалось завершить анализ после нескольких попыток',
});
}
}
}
Здесь важно несколько моментов.
Во‑первых, attempts хранится в самой задаче — это удобно и для логирования, и для наблюдаемости (на графике будет красиво видно, сколько задач проходят с ретраями).
Во‑вторых, при каждом retry мы отправляем job.progress с явным указанием, что это попытка №N. Модель может использовать эту информацию, чтобы объяснить пользователю, что «сервер подарков отвечает нестабильно, пробую ещё раз».
В‑третьих, мы гарантируем, что в любом случае либо отправится job.completed, либо job.failed. Никаких висящих задач «ни жив, ни мёртв».
Отмена ('canceled') — ещё один важный статус. В учебных примерах мы её не реализуем, но в продакшене он обычно ставится по инициативе пользователя (кнопка «Отменить» в виджете) или по таймауту. В таком случае воркер при следующем взятии задачи из очереди видит status: 'canceled', не запускает обработку, а MCP‑сервер отправляет финальное событие job.canceled.
9. Идемпотентность и retry: не наступаем дважды на те же грабли
Когда вы вводите retry, сразу появляется риск «сделать одно и то же дважды». В commerce‑модулях это критично (например, двойное списание денег), но и в GiftGenius тоже есть сценарии, где повтор — плохо: отправка двух одинаковых писем другу, дублирование записи в вашей внутренней аналитике и т.п.
Поэтому нужно ориентироваться на два принципа.
Первый: обработчик job должен быть идемпотентным.
Если вы вызываете его с одним и тем же jobId несколько раз (в рамках retry или по ошибке), мир не должен ломаться. Для этого:
- все побочные эффекты (запись в БД, отправка писем, создание заказов) должны быть завязаны на jobId или другой естественный идентификатор, чтобы в коде можно было быстро проверить, делали ли мы уже этот шаг;
- если job.status уже 'completed' или 'failed', повторный вызов можно либо игнорировать, либо просто отдать уже готовый результат.
Пример простой защиты:
export async function processJob(jobId: string) {
const job = getJob(jobId);
if (!job) return;
if (job.status === 'completed' || job.status === 'failed') {
// Задача уже успешно или окончательно завершена
return;
}
// ... остальной код
}
Второй: события тоже должны быть идемпотентны.
Мы уже говорили про event_id и то, что клиент может фильтровать дубликаты, но на стороне сервера тоже стоит быть аккуратным: при рестарте воркера или восстановлении из очереди не спамьте клиенту теми же job.progress без необходимости.
10. Где находятся очереди и воркеры в вашей архитектуре
На картинке всё красиво, но где физически крутится воркер? Есть несколько типичных вариантов.
Интегрированный воркер: MCP‑сервер и воркер — один и тот же процесс/деплой. Он же принимает tool‑calls, он же поднимает worker loop. Плюс в простоте: меньше сервисов, проще деплоить. Минус — масштабирование: чтобы добавить воркеров, приходится масштабировать весь MCP‑сервер.
Выделенный воркер: MCP‑сервер — один сервис, воркеры — другой. Между ними — очередь и, возможно, Pub/Sub для событий. Это то, о чём много пишут в контексте BullMQ/Redis и MCP‑событий: MCP сервер подписан на Redis‑канал 'mcp:events', воркеры публикуют туда события.
Комбинированный вариант: один инстанс MCP‑сервера крутит и воркер, остальные инстансы — только HTTP/SSE. Это может быть полезным, если вы деплоите на Vercel или другой serverless‑платформе, где постоянные фоновые процессы — удовольствия ниже среднего.
В нашем учебном GiftGenius пока допустим первый вариант: MCP‑сервер + один простой воркер в процессе. Когда вы дойдёте до модулей про продакшен и масштабирование, можно будет мигрировать воркеры в отдельный сервис.
11. Пример: полный async‑пайплайн GiftGenius
Давайте связно проговорим, что происходит, когда пользователь в чате пишет:
«Мне нужен сложный подбор подарка для фаната космоса, с учётом его прошлых покупок».
- Модель решает вызвать инструмент start_deep_analysis с параметрами профиля получателя и бюджета.
- Инструмент создаёт GiftJob в БД со статусом 'pending', кладёт его в очередь и возвращает jobId + сообщение подтверждения.
- ChatGPT объясняет пользователю, что анализ запущен, и может передать jobId в виджет GiftGenius.
- Виджет подписывается на события по этому jobId через SSE, показывает прогресс‑бар и статус «Собираем и анализируем данные».
- Воркер, увидев новый job в очереди, обновляет статус на 'in_progress' и шлёт job.started.
- В процессе работы он несколько раз посылает job.progress (этапы) и job.partial (первые 2–3 подарка).
- Если где‑то по пути падает внешний API, воркер пробует ещё раз с экспоненциальным backoff, обновляя attempts и отправляя событие с информацией о повторной попытке.
- В конце он либо отправляет job.completed с кратким summary и финальными рекомендациями, либо job.failed с понятным объяснением.
- Виджет на основании этих событий обновляет UI, а ChatGPT может сформировать текстовую сводку и предложить follow‑up: «Показать ещё идеи», «Сузить бюджет», «Сменить тип подарка».
С точки зрения пользователя это «живой» долгий процесс, который под контролем. С точки зрения backend’а — нормальный async‑пайплайн с очередью, воркерами и retry.
12. Небольшое упражнение (для самостоятельной практики)
Если хотите закрепить материал, попробуйте для GiftGenius:
- придумать схему таблицы jobs для реальной БД: какие индексы вам нужны, какие поля будут участвовать в фильтрации (по пользователю, по статусу, по дате создания);
- набросать TypeScript‑тип для HTTP‑endpoint’а /api/jobs/:id, чтобы виджет мог в крайнем случае делать поллинг статуса, если SSE недоступен;
- описать политику retry: сколько попыток, базовая задержка, что делать с задачами, которые всё равно упали (простая dead‑letter‑таблица или логирование + алерт).
Это задание вам пригодится позже, когда в модулях про продакшен и наблюдаемость мы будем говорить про метрики типа «сколько задач зависло в статусе pending дольше N минут».
13. Типичные ошибки при работе с асинхронными задачами
Ошибка №1: делать всё синхронно в tool‑call.
Самая распространённая ловушка — попытаться впихнуть всю тяжёлую работу в один MCP‑инструмент без очереди. Пока запросов мало, это вроде бы работает. Как только нагрузка растёт или внешние API начинают тормозить, вы ловите таймауты, зависания чата и крайне нервный UX. Любую операцию, которая потенциально может длиться десятки секунд и дольше, лучше сразу проектировать как async‑job с jobId.
Ошибка №2: отсутствие явной модели Job.
Иногда разработчики пытаются обойтись «просто сообщениями в очереди», не храня состояние задач в БД. В итоге трудно ответить на базовые вопросы: «каков статус задачи?», «сколько раз мы пытались её выполнить?», «почему она упала?». Чёткая модель Job с полями status, attempts, error, createdAt — это основа для дебага, мониторинга и UX.
Ошибка №3: отсутствие retry или, наоборот, бесконечные повторы.
Кто‑то вообще не делает retry и падает при первом же 500, кто‑то делает while (!success) и не ограничивает число попыток. В первом случае вы теряете кучу задач из‑за кратковременных сбоев, во втором — создаёте себе «шторма» нагрузки и рискуете заблокировать внешние API. Нужна разумная середина: ограниченное число попыток + экспоненциальная задержка + разделение временных и постоянных ошибок.
Ошибка №4: неидемпотентные обработчики.
Если при каждой попытке вы, например, создаёте новую запись в сторонней системе без проверки, выполняете один и тот же платёж или отправляете одно и то же письмо — retry быстро становится проблемой. Обработчик должен уметь понять, что задача с этим jobId уже была успешно доведена до конца, и не повторять опасные побочные эффекты.
Ошибка №5: отсутствие событий при ошибках.
Бывает, что воркер падает с неожиданным исключением, логирует его в консоль и на этом всё. Пользователь сидит и вечно ждёт job.completed, не зная, что всё уже давно умерло. Любая ветка, где обработка завершилась с ошибкой, должна в итоге приводить к job.failed и обновлению статуса Job в БД. Без этого ваши потоки MCP превращаются в односторонний «чёрный ящик».
Ошибка №6: слишком частые события прогресса.
Желание «быть честным» и слать job.progress на каждый один процент выполнения приводит к перегрузке сети, клиента и MCP‑сервера. Лучше отправлять прогресс при смене этапа или крупной дельте (например, каждые 10%), а всё остальное хранить только во внутренних логах.
Ошибка №7: использование ин‑мемори очереди в продакшене.
Учебный пример с queue: string[] и Map — хорош для понимания архитектуры, но в реальной продакшен‑системе он развалится при первом же рестарте процесса или падении сервера. Для серьёзной эксплуатации нужны внешние очереди и хранилища: SQS, Pub/Sub, RabbitMQ, Redis Streams и т.п. Ин‑мемори варианты годятся только для локальной разработки и простых демо.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ