JavaRush /Курсы /ChatGPT Apps /Асинхронные задачи: очереди, воркеры, повторные отправки ...

Асинхронные задачи: очереди, воркеры, повторные отправки (retry)

ChatGPT Apps
13 уровень , 3 лекция
Открыта

1. Зачем вообще асинхронные задачи в ChatGPT App

Если бы мир был идеален, любой ваш MCP tool успевал бы за пару сотен миллисекунд. Но в жизни всё интересное — долгое и тяжёлое:

  • разбор большого CSV с историей покупок пользователя;
  • агрегация данных из нескольких внешних API, каждый из которых то спит, то отвечает с 503;
  • построение сложных рекомендаций с кучей промежуточных шагов;
  • генерация больших отчётов и презентаций.

Если попытаться запихнуть это в один синхронный tool‑call, вы упрётесь в три проблемы.

Во‑первых, таймауты. ChatGPT‑сессия, HTTP‑инфраструктура, MCP‑клиент — всё это не рассчитано на ответы «через пять минут». Сервер, который держит соединение слишком долго, будет выглядеть как «подвисший» и для ChatGPT, и для пользователя.

Во‑вторых, управление нагрузкой. Если сто пользователей одновременно запустят «супер‑анализ подарков на Новый год», вы не хотите, чтобы MCP‑сервер синхронно держал сто долгих задач прямо в HTTP‑потоках. Вам нужна прослойка, которая умеет принимать всплески, раскладывать задачи по очереди и обрабатывать их несколькими воркерами.

В‑третьих, UX. Пользователь нажал кнопку в виджете GiftGenius и уставился в один спиннер на 40 секунд — ощущение примерно как от старых интернет‑банков. Нам намного приятнее модель «быстрый ответ + прогресс + возможность отмены».

Эти проблемы решаются общей схемой: «запуск → очередь → фон → события».

2. Базовая архитектура async‑job в контексте MCP

Возьмём наш GiftGenius. Допустим, у нас появился новый тяжёлый сценарий: «Глубокий анализ предпочтений по истории покупок и соцсетям друга». Такая штука может работать несколько минут, поэтому:

  1. Инструмент MCP (tool) принимает параметры запроса от модели.
  2. Вместо того, чтобы сразу всё считать, он создаёт запись Job в базе.
  3. Кладёт задачу в очередь.
  4. Мгновенно отвечает ChatGPT: «Анализ запущен, вот jobId».
  5. Фоновый воркер забирает задачу из очереди, выполняет тяжёлую работу, по ходу дела шлёт MCP‑события job.progress и job.partial, в конце — job.completed или job.failed.

С точки зрения архитектуры это выглядит примерно так:

flowchart LR
    subgraph ChatGPT
      U[Пользователь] --> GPT[Модель + ChatGPT UI]
    end

    GPT -->|call_tool analyze_preferences| MCP[MCP-сервер]

    subgraph Backend
      MCP -->|создать Job| DB[(БД задач)]
      MCP -->|enqueue| Q[Очередь]
      W[Воркер] -->|take job| Q
      W -->|update status/progress| DB
      W -->|MCP events: job.progress/job.completed| MCP
    end

    MCP -->|SSE events| GPT

Важная мысль: MCP‑сервер — это не обязательно монолит. Часто он выступает как фасад над вашей внутренней асинхронной инфраструктурой: он принимает tool‑calls, создаёт job’ы и шлёт события, а тяжёлую работу делают отдельные процессы‑воркеры.

3. Модель данных для асинхронной задачи

Начнём с простой модели Job. Мы будем использовать TypeScript и условный Node/MCP‑сервер, чтобы вы сразу видели, как это вписывается в ваш стек.

Простейшая модель в памяти/БД может выглядеть так:

// openai/jobs/model.ts
export type JobStatus =
  | 'pending'
  | 'in_progress'
  | 'completed'
  | 'failed'
  | 'canceled';

export interface GiftJob {
  id: string;                // jobId
  type: 'deep_gift_analysis';
  status: JobStatus;
  payload: {
    recipientProfile: string;  // текст/ID профиля
    budget: number;
  };
  result?: unknown;          // итоговые рекомендации
  error?: string;            // причина ошибки
  attempts: number;          // сколько раз пытались выполнить
  createdAt: Date;
  updatedAt: Date;
}

В реальном проекте вы будете хранить GiftJob в Postgres, DynamoDB, Firestore или ещё где‑нибудь, но для лекции нам важны поля:

  • status — текущее состояние задачи, которое будет отражаться и в событиях, и в UX;
  • attempts — счётчик для retry;
  • error — для логов и отладки;
  • payload — входные данные, которые воркер использует для обработки.

4. Инструмент MCP, который создаёт async‑job

Представим инструмент start_deep_analysis. Раньше он мог бы делать всё синхронно, а теперь только ставит задачу в очередь и возвращает jobId.

// openai/tools/startDeepAnalysis.ts
import { v4 as uuid } from 'uuid';
import { createJobAndEnqueue } from '../jobs/queue';

// Псевдотипы для MCP SDK
type StartDeepAnalysisInput = {
  recipientProfile: string;
  budget: number;
};

type StartDeepAnalysisOutput = {
  jobId: string;
  message: string;
};

export async function startDeepAnalysisTool(
  input: StartDeepAnalysisInput
): Promise<StartDeepAnalysisOutput> {
  const jobId = uuid();

  await createJobAndEnqueue({
    id: jobId,
    type: 'deep_gift_analysis',
    status: 'pending',
    payload: {
      recipientProfile: input.recipientProfile,
      budget: input.budget,
    },
    attempts: 0,
    createdAt: new Date(),
    updatedAt: new Date(),
  });

  return {
    jobId,
    message: `Запустила глубокий анализ. ID задачи: ${jobId}. Я буду присылать обновления по мере готовности.`,
  };
}

Здесь важно, что:

  • MCP‑инструмент работает быстро: максимум пара запросов к БД/очереди;
  • он возвращает структурированный ответ с jobId, который ChatGPT может использовать в своём «объяснении пользователю» и который виджет GiftGenius может сохранить в своём widgetState.

Ваша JSON Schema для этого инструмента просто описывает jobId как строку и message как человекочитаемый текст — модель поймёт, что это идентификатор задачи, и сможет ссылаться на него в следующих шагах диалога.

5. Простая очередь и воркер: учебная версия

Чтобы не тащить сейчас Redis, RabbitMQ и всё остальное, сделаем упрощённую ин‑мемори очередь. В реальном проде, конечно, это будет отдельный сервис (SQS/BullMQ/Cloud Tasks и т.п.), но логика останется такой же.

Сначала заготовка очереди:

// openai/jobs/queue.ts
import type { GiftJob } from './model';

const jobs = new Map<string, GiftJob>();   // "БД" в памяти
export const queue: string[] = [];         // упрощённая очередь по id

export async function createJobAndEnqueue(job: GiftJob) {
  jobs.set(job.id, job);
  queue.push(job.id);
}

export function getJob(id: string): GiftJob | undefined {
  return jobs.get(id);
}

export function updateJob(id: string, patch: Partial<GiftJob>) {
  const job = jobs.get(id);
  if (!job) return;
  const updated: GiftJob = { ...job, ...patch, updatedAt: new Date() };
  jobs.set(id, updated);
}

Теперь примитивный воркер, который периодически заглядывает в очередь, берёт job и обрабатывает её:

// openai/jobs/worker.ts
import { getJob, updateJob } from './queue';
import { emitJobEvent } from './events';

async function processJob(jobId: string) {
  const job = getJob(jobId);
  if (!job) return;

  updateJob(jobId, { status: 'in_progress' });
  await emitJobEvent(jobId, 'job.started', {});

  try {
    // Тут вызываем долгую бизнес-логику
    const result = await doDeepGiftAnalysis(job.id, job.payload);

    updateJob(jobId, { status: 'completed', result });
    await emitJobEvent(jobId, 'job.completed', { resultSummary: summarize(result) });
  } catch (err) {
    updateJob(jobId, {
      status: 'failed',
      error: (err as Error).message,
    });
    await emitJobEvent(jobId, 'job.failed', { error: 'Internal error' });
  }
}

И сам «циклический» воркер, который можно запустить где‑то при старте приложения:

// openai/jobs/workerLoop.ts
import { queue } from './queue';
import { processJob } from './worker';

export function startWorkerLoop() {
  setInterval(async () => {
    const jobId = queue.shift(); // по-хорошему нужна защита от гонок
    if (!jobId) return;

    await processJob(jobId);
  }, 1000); // раз в секунду проверяем очередь
}

Это учебный пример. В реальном мире вместо setInterval будет нормальная очередь, которая сама «будит» воркера, когда появляется новое сообщение. Но общая идея хорошо видна: воркер отделён от MCP‑инструмента, работает фоном и общается с MCP‑сервером через события.

6. Генерация MCP‑событий из воркера

В предыдущих лекциях вы уже видели формат MCP‑событий: тип (type), уникальный event_id, timestamp, job_id и payload. Теперь покажем, как воркер может вызывать helper emitJobEvent, а тот — доставлять события до ChatGPT через SSE‑канал MCP‑сервера.

Пример простого хелпера:

// openai/jobs/events.ts
import { randomUUID } from 'crypto';
import { sendMcpEvent } from '../mcp/eventBus';

export async function emitJobEvent(
  jobId: string,
  type: 'job.started' | 'job.progress' | 'job.completed' | 'job.failed',
  payload: unknown
) {
  const event = {
    event_id: randomUUID(),
    type,
    job_id: jobId,
    timestamp: new Date().toISOString(),
    payload,
  };

  await sendMcpEvent(event);
}

А sendMcpEvent внутри MCP‑сервера уже знает, как именно пихнуть это событие в SSEServerTransport из MCP SDK: например, через локальную шину событий или Redis Pub/Sub, как мы разбирали в модуле 12.

Ключевая мысль: воркер не общается с ChatGPT напрямую. Он общается с MCP‑сервером, а тот держит SSE‑соединения и пересылает события клиентам.

7. Прогресс и partial results из воркера

Теперь к самому вкусному: прогресс и частичные результаты. В GiftGenius долгий анализ можно разбить на этапы:

  • сбор и нормализация данных;
  • построение базовых сегментов;
  • генерация первичных подарочных идей;
  • финальный ранкинг и текстовое объяснение.

На каждом этапе мы можем отправлять job.progress и иногда — job.partial, чтобы UI уже показывал первые подарки.

Условный воркер:

async function doDeepGiftAnalysis(jobId: string, payload: GiftJob['payload']) {
  await emitJobEvent(jobId, 'job.progress', { step: 1, totalSteps: 4 });

  const normalized = await collectAndNormalizeData(payload);
  await emitJobEvent(jobId, 'job.progress', { step: 2, totalSteps: 4 });

  const roughGifts = await generateInitialGifts(normalized);
  await emitJobEvent(jobId, 'job.partial', { gifts: roughGifts.slice(0, 3) });

  await emitJobEvent(jobId, 'job.progress', { step: 3, totalSteps: 4 });

  const finalGifts = await rerankAndBeautify(roughGifts);
  await emitJobEvent(jobId, 'job.progress', { step: 4, totalSteps: 4 });

  return finalGifts;
}

Виджет, слушая события, может сначала показать 3 «черновых» подарка с пометкой «Ещё уточняем», а после job.completed — обновить список и убрать индикатор загрузки. Всё это идеально ложится на UX‑паттерны, о которых мы говорили в лекции 3.

8. Retry‑логика для воркеров

Теперь самое нервное место: ошибки и повторы.

Представьте, что воркер при обработке задачи ходит во внешний API списка товаров, а тот периодически отвечает 500 или 429. Бросать задачу после первой же ошибки — странно. Но и перезапускать бесконечно нельзя: так вы устроите DDoS по самим себе или по стороннему сервису.

Нам нужна стратегия retry с экспоненциальной задержкой и ограничением числа попыток.

Начнём с классификации ошибок, которая будет нам нужна и дальше в курсе:

  • временные (transient) — таймауты, 500, 503, 429;
  • постоянные (permanent) — неверный вход, несуществующий ресурс;
  • фатальные (bug) — баги кода, TypeError, неожиданное исключение.

Повторять имеет смысл только временные ошибки. Остальные нужно честно помечать как 'failed'.

Упростим и сделаем хелпер:

// openai/jobs/retry.ts
export function shouldRetry(error: unknown): boolean {
  if (!(error instanceof Error)) return false;
  // Условно: HTTP 5xx или 429
  return /5\d\d|429/.test(error.message);
}

export function getDelayMs(base: number, attempt: number): number {
  const jitter = Math.random() * 100;   // небольшой шум
  return base * 2 ** attempt + jitter; // экспоненциальный backoff
}

Теперь обновим воркер так, чтобы он учитывал attempts в GiftJob:

// openai/jobs/worker.ts
import { getJob, updateJob } from './queue';
import { emitJobEvent } from './events';
import { shouldRetry, getDelayMs } from './retry';

const MAX_ATTEMPTS = 5;

export async function processJob(jobId: string) {
  const job = getJob(jobId);
  if (!job) return;

  updateJob(jobId, { status: 'in_progress' });

  try {
    const result = await doDeepGiftAnalysis(job.id, job.payload);

    updateJob(jobId, { status: 'completed', result });
    await emitJobEvent(jobId, 'job.completed', {
      resultSummary: summarize(result),
    });
  } catch (err) {
    const attempts = job.attempts + 1;
    const error = err as Error;

    if (attempts <= MAX_ATTEMPTS && shouldRetry(error)) {
      const delay = getDelayMs(1000, attempts); // 1s,2s,4s...

      updateJob(jobId, { attempts, status: 'pending', error: error.message });

      setTimeout(() => {
        // В реальной очереди вы бы "переenqueue'или" job с задержкой
        processJob(jobId);
      }, delay);

      await emitJobEvent(jobId, 'job.progress', {
        retry: attempts,
        nextAttemptInMs: delay,
      });
    } else {
      updateJob(jobId, { status: 'failed', error: error.message });
      await emitJobEvent(jobId, 'job.failed', {
        error: 'Не удалось завершить анализ после нескольких попыток',
      });
    }
  }
}

Здесь важно несколько моментов.

Во‑первых, attempts хранится в самой задаче — это удобно и для логирования, и для наблюдаемости (на графике будет красиво видно, сколько задач проходят с ретраями).

Во‑вторых, при каждом retry мы отправляем job.progress с явным указанием, что это попытка №N. Модель может использовать эту информацию, чтобы объяснить пользователю, что «сервер подарков отвечает нестабильно, пробую ещё раз».

В‑третьих, мы гарантируем, что в любом случае либо отправится job.completed, либо job.failed. Никаких висящих задач «ни жив, ни мёртв».

Отмена ('canceled') — ещё один важный статус. В учебных примерах мы её не реализуем, но в продакшене он обычно ставится по инициативе пользователя (кнопка «Отменить» в виджете) или по таймауту. В таком случае воркер при следующем взятии задачи из очереди видит status: 'canceled', не запускает обработку, а MCP‑сервер отправляет финальное событие job.canceled.

9. Идемпотентность и retry: не наступаем дважды на те же грабли

Когда вы вводите retry, сразу появляется риск «сделать одно и то же дважды». В commerce‑модулях это критично (например, двойное списание денег), но и в GiftGenius тоже есть сценарии, где повтор — плохо: отправка двух одинаковых писем другу, дублирование записи в вашей внутренней аналитике и т.п.

Поэтому нужно ориентироваться на два принципа.

Первый: обработчик job должен быть идемпотентным.

Если вы вызываете его с одним и тем же jobId несколько раз (в рамках retry или по ошибке), мир не должен ломаться. Для этого:

  • все побочные эффекты (запись в БД, отправка писем, создание заказов) должны быть завязаны на jobId или другой естественный идентификатор, чтобы в коде можно было быстро проверить, делали ли мы уже этот шаг;
  • если job.status уже 'completed' или 'failed', повторный вызов можно либо игнорировать, либо просто отдать уже готовый результат.

Пример простой защиты:

export async function processJob(jobId: string) {
  const job = getJob(jobId);
  if (!job) return;

  if (job.status === 'completed' || job.status === 'failed') {
    // Задача уже успешно или окончательно завершена
    return;
  }

  // ... остальной код
}

Второй: события тоже должны быть идемпотентны.

Мы уже говорили про event_id и то, что клиент может фильтровать дубликаты, но на стороне сервера тоже стоит быть аккуратным: при рестарте воркера или восстановлении из очереди не спамьте клиенту теми же job.progress без необходимости.

10. Где находятся очереди и воркеры в вашей архитектуре

На картинке всё красиво, но где физически крутится воркер? Есть несколько типичных вариантов.

Интегрированный воркер: MCP‑сервер и воркер — один и тот же процесс/деплой. Он же принимает tool‑calls, он же поднимает worker loop. Плюс в простоте: меньше сервисов, проще деплоить. Минус — масштабирование: чтобы добавить воркеров, приходится масштабировать весь MCP‑сервер.

Выделенный воркер: MCP‑сервер — один сервис, воркеры — другой. Между ними — очередь и, возможно, Pub/Sub для событий. Это то, о чём много пишут в контексте BullMQ/Redis и MCP‑событий: MCP сервер подписан на Redis‑канал 'mcp:events', воркеры публикуют туда события.

Комбинированный вариант: один инстанс MCP‑сервера крутит и воркер, остальные инстансы — только HTTP/SSE. Это может быть полезным, если вы деплоите на Vercel или другой serverless‑платформе, где постоянные фоновые процессы — удовольствия ниже среднего.

В нашем учебном GiftGenius пока допустим первый вариант: MCP‑сервер + один простой воркер в процессе. Когда вы дойдёте до модулей про продакшен и масштабирование, можно будет мигрировать воркеры в отдельный сервис.

11. Пример: полный async‑пайплайн GiftGenius

Давайте связно проговорим, что происходит, когда пользователь в чате пишет:

«Мне нужен сложный подбор подарка для фаната космоса, с учётом его прошлых покупок».

  1. Модель решает вызвать инструмент start_deep_analysis с параметрами профиля получателя и бюджета.
  2. Инструмент создаёт GiftJob в БД со статусом 'pending', кладёт его в очередь и возвращает jobId + сообщение подтверждения.
  3. ChatGPT объясняет пользователю, что анализ запущен, и может передать jobId в виджет GiftGenius.
  4. Виджет подписывается на события по этому jobId через SSE, показывает прогресс‑бар и статус «Собираем и анализируем данные».
  5. Воркер, увидев новый job в очереди, обновляет статус на 'in_progress' и шлёт job.started.
  6. В процессе работы он несколько раз посылает job.progress (этапы) и job.partial (первые 23 подарка).
  7. Если где‑то по пути падает внешний API, воркер пробует ещё раз с экспоненциальным backoff, обновляя attempts и отправляя событие с информацией о повторной попытке.
  8. В конце он либо отправляет job.completed с кратким summary и финальными рекомендациями, либо job.failed с понятным объяснением.
  9. Виджет на основании этих событий обновляет UI, а ChatGPT может сформировать текстовую сводку и предложить follow‑up: «Показать ещё идеи», «Сузить бюджет», «Сменить тип подарка».

С точки зрения пользователя это «живой» долгий процесс, который под контролем. С точки зрения backend’а — нормальный async‑пайплайн с очередью, воркерами и retry.

12. Небольшое упражнение (для самостоятельной практики)

Если хотите закрепить материал, попробуйте для GiftGenius:

  • придумать схему таблицы jobs для реальной БД: какие индексы вам нужны, какие поля будут участвовать в фильтрации (по пользователю, по статусу, по дате создания);
  • набросать TypeScript‑тип для HTTP‑endpoint’а /api/jobs/:id, чтобы виджет мог в крайнем случае делать поллинг статуса, если SSE недоступен;
  • описать политику retry: сколько попыток, базовая задержка, что делать с задачами, которые всё равно упали (простая dead‑letter‑таблица или логирование + алерт).

Это задание вам пригодится позже, когда в модулях про продакшен и наблюдаемость мы будем говорить про метрики типа «сколько задач зависло в статусе pending дольше N минут».

13. Типичные ошибки при работе с асинхронными задачами

Ошибка №1: делать всё синхронно в tool‑call.
Самая распространённая ловушка — попытаться впихнуть всю тяжёлую работу в один MCP‑инструмент без очереди. Пока запросов мало, это вроде бы работает. Как только нагрузка растёт или внешние API начинают тормозить, вы ловите таймауты, зависания чата и крайне нервный UX. Любую операцию, которая потенциально может длиться десятки секунд и дольше, лучше сразу проектировать как async‑job с jobId.

Ошибка №2: отсутствие явной модели Job.
Иногда разработчики пытаются обойтись «просто сообщениями в очереди», не храня состояние задач в БД. В итоге трудно ответить на базовые вопросы: «каков статус задачи?», «сколько раз мы пытались её выполнить?», «почему она упала?». Чёткая модель Job с полями status, attempts, error, createdAt — это основа для дебага, мониторинга и UX.

Ошибка №3: отсутствие retry или, наоборот, бесконечные повторы.
Кто‑то вообще не делает retry и падает при первом же 500, кто‑то делает while (!success) и не ограничивает число попыток. В первом случае вы теряете кучу задач из‑за кратковременных сбоев, во втором — создаёте себе «шторма» нагрузки и рискуете заблокировать внешние API. Нужна разумная середина: ограниченное число попыток + экспоненциальная задержка + разделение временных и постоянных ошибок.

Ошибка №4: неидемпотентные обработчики.
Если при каждой попытке вы, например, создаёте новую запись в сторонней системе без проверки, выполняете один и тот же платёж или отправляете одно и то же письмо — retry быстро становится проблемой. Обработчик должен уметь понять, что задача с этим jobId уже была успешно доведена до конца, и не повторять опасные побочные эффекты.

Ошибка №5: отсутствие событий при ошибках.
Бывает, что воркер падает с неожиданным исключением, логирует его в консоль и на этом всё. Пользователь сидит и вечно ждёт job.completed, не зная, что всё уже давно умерло. Любая ветка, где обработка завершилась с ошибкой, должна в итоге приводить к job.failed и обновлению статуса Job в БД. Без этого ваши потоки MCP превращаются в односторонний «чёрный ящик».

Ошибка №6: слишком частые события прогресса.
Желание «быть честным» и слать job.progress на каждый один процент выполнения приводит к перегрузке сети, клиента и MCP‑сервера. Лучше отправлять прогресс при смене этапа или крупной дельте (например, каждые 10%), а всё остальное хранить только во внутренних логах.

Ошибка №7: использование ин‑мемори очереди в продакшене.
Учебный пример с queue: string[] и Map — хорош для понимания архитектуры, но в реальной продакшен‑системе он развалится при первом же рестарте процесса или падении сервера. Для серьёзной эксплуатации нужны внешние очереди и хранилища: SQS, Pub/Sub, RabbitMQ, Redis Streams и т.п. Ин‑мемори варианты годятся только для локальной разработки и простых демо.

1
Задача
ChatGPT Apps, 13 уровень, 3 лекция
Недоступна
Быстрый старт async-job + сохранение jobId в widgetState
Быстрый старт async-job + сохранение jobId в widgetState
1
Задача
ChatGPT Apps, 13 уровень, 3 лекция
Недоступна
Worker loop + поллинг статуса (start → queue → worker → poll)
Worker loop + поллинг статуса (start → queue → worker → poll)
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ