JavaRush /Курси /ChatGPT Apps /Асинхронні завдання: черги, воркери, повторні спроби (ret...

Асинхронні завдання: черги, воркери, повторні спроби (retry)

ChatGPT Apps
Рівень 13 , Лекція 3
Відкрита

1. Навіщо взагалі асинхронні завдання в ChatGPT App

Якби світ був ідеальним, будь‑який ваш MCP‑інструмент (tool) виконувався б за кількасот мілісекунд. Та на практиці все найцікавіше — довге й важке:

  • розбір великого CSV з історією покупок користувача;
  • агрегація даних із кількох зовнішніх API, кожне з яких то «засинає», то відповідає 503;
  • побудова складних рекомендацій із багатьма проміжними кроками;
  • генерація великих звітів і презентацій.

Якщо спробувати втиснути все це в один синхронний tool‑call, ви упретеся в три проблеми.

По‑перше, тайм‑аути. ChatGPT‑сесія, HTTP‑інфраструктура, MCP‑клієнт — усе це не розраховано на відповідь «за пʼять хвилин». Сервер, який тримає з’єднання надто довго, виглядатиме «завислим» — і для ChatGPT, і для користувача.

По‑друге, керування навантаженням. Якщо сто користувачів одночасно запустять «супер‑аналіз подарунків на Новий рік», ви не хочете, щоб MCP‑сервер синхронно обслуговував сто довгих завдань просто в HTTP‑потоках. Потрібен проміжний шар, який уміє приймати сплески, розкладати завдання в черзі й обробляти їх кількома воркерами.

По‑третє, UX. Користувач натиснув кнопку у віджеті GiftGenius і бачить лише спінер протягом 40 секунд — відчуття приблизно як у старих інтернет‑банках. Набагато приємніша модель — «швидка відповідь + прогрес + можливість скасування».

Ці проблеми знімає проста схема: «запуск → черга → тло → події».

2. Базова архітектура async‑job у контексті MCP

Візьмімо наш GiftGenius. Припустімо, у нас зʼявився новий важкий сценарій: «Глибокий аналіз уподобань за історією покупок і соцмережами друга». Така штука може працювати кілька хвилин, тож:

  1. Інструмент MCP (tool) приймає параметри запиту від моделі.
  2. Замість того, щоб одразу все рахувати, він створює запис Job у базі.
  3. Кладе завдання в чергу.
  4. Миттєво відповідає ChatGPT: «Аналіз запущено, ось jobId».
  5. Фоновий воркер забирає завдання з черги, виконує важку роботу й у процесі шле MCP‑події job.progress і job.partial, а наприкінці — job.completed або job.failed.

З погляду архітектури це виглядає приблизно так:

flowchart LR
    subgraph ChatGPT
      U[Користувач] --> GPT[Модель + ChatGPT UI]
    end

    GPT -->|call_tool analyze_preferences| MCP[MCP‑сервер]

    subgraph Backend
      MCP -->|створити Job| DB[(БД завдань)]
      MCP -->|enqueue| Q[Черга]
      W[Воркер] -->|take job| Q
      W -->|update status/progress| DB
      W -->|MCP events: job.progress/job.completed| MCP
    end

    MCP -->|SSE events| GPT

Важлива думка: MCP‑сервер — це не обов’язково моноліт. Часто він працює як фасад над вашою внутрішньою асинхронною інфраструктурою: приймає tool‑calls, створює завдання (job) і надсилає події, а важку роботу виконують окремі процеси‑воркери.

3. Модель даних для асинхронного завдання

Почнемо з простої моделі Job. Ми використаємо TypeScript і умовний Node/MCP‑сервер, щоб ви відразу бачили, як це вписується у ваш технологічний стек.

Найпростіша модель у памʼяті або в БД може виглядати так:

// openai/jobs/model.ts
export type JobStatus =
  | 'pending'
  | 'in_progress'
  | 'completed'
  | 'failed'
  | 'canceled';

export interface GiftJob {
  id: string;                // jobId
  type: 'deep_gift_analysis';
  status: JobStatus;
  payload: {
    recipientProfile: string;  // текст/ID профілю
    budget: number;
  };
  result?: unknown;          // підсумкові рекомендації
  error?: string;            // причина помилки
  attempts: number;          // скільки разів намагалися виконати
  createdAt: Date;
  updatedAt: Date;
}

У реальному проєкті ви зберігатимете GiftJob у Postgres, DynamoDB, Firestore чи ще десь. Але для лекції нам важливі такі поля:

  • status — поточний стан завдання, який відображатиметься і в подіях, і в UX;
  • attempts — лічильник для retry;
  • error — для логів і налагодження;
  • payload — вхідні дані, які воркер використовує для обробки.

4. Інструмент MCP, що створює async‑job

Уявімо інструмент start_deep_analysis. Раніше він міг би робити все синхронно, а тепер лише додає завдання в чергу й повертає jobId.

// openai/tools/startDeepAnalysis.ts
import { v4 as uuid } from 'uuid';
import { createJobAndEnqueue } from '../jobs/queue';

// Псевдотипи для MCP SDK
type StartDeepAnalysisInput = {
  recipientProfile: string;
  budget: number;
};

type StartDeepAnalysisOutput = {
  jobId: string;
  message: string;
};

export async function startDeepAnalysisTool(
  input: StartDeepAnalysisInput
): Promise<StartDeepAnalysisOutput> {
  const jobId = uuid();

  await createJobAndEnqueue({
    id: jobId,
    type: 'deep_gift_analysis',
    status: 'pending',
    payload: {
      recipientProfile: input.recipientProfile,
      budget: input.budget,
    },
    attempts: 0,
    createdAt: new Date(),
    updatedAt: new Date(),
  });

  return {
    jobId,
    message: `Запущено глибокий аналіз. Ідентифікатор завдання: ${jobId}. Я надсилатиму оновлення в міру готовності.`,
  };
}

Тут важливо, що:

  • MCP‑інструмент працює швидко: максимум кілька запитів до БД/черги;
  • він повертає структуровану відповідь із jobId, який ChatGPT може використати у своєму «поясненні користувачеві», і який віджет GiftGenius може зберегти у своєму widgetState.

Ваша JSON Schema для цього інструмента просто описує jobId як рядок, а message — як зрозумілий текст. Модель побачить, що це ідентифікатор завдання, і зможе посилатися на нього в наступних кроках діалогу.

5. Проста черга й воркер: навчальна версія

Щоб не під’єднувати зараз Redis, RabbitMQ та все інше, зробімо спрощену чергу в пам’яті. У реальному продакшні, звісно, це буде окремий сервіс (SQS/BullMQ/Cloud Tasks тощо), але логіка залишиться такою самою.

Спочатку — заготовка черги:

// openai/jobs/queue.ts
import type { GiftJob } from './model';

const jobs = new Map<string, GiftJob>();   // "БД" у памʼяті
export const queue: string[] = [];         // спрощена черга за id

export async function createJobAndEnqueue(job: GiftJob) {
  jobs.set(job.id, job);
  queue.push(job.id);
}

export function getJob(id: string): GiftJob | undefined {
  return jobs.get(id);
}

export function updateJob(id: string, patch: Partial<GiftJob>) {
  const job = jobs.get(id);
  if (!job) return;
  const updated: GiftJob = { ...job, ...patch, updatedAt: new Date() };
  jobs.set(id, updated);
}

Тепер примітивний воркер, який періодично зазирає в чергу, бере job і обробляє його:

// openai/jobs/worker.ts
import { getJob, updateJob } from './queue';
import { emitJobEvent } from './events';

async function processJob(jobId: string) {
  const job = getJob(jobId);
  if (!job) return;

  updateJob(jobId, { status: 'in_progress' });
  await emitJobEvent(jobId, 'job.started', {});

  try {
    // Тут викликаємо довгу бізнес-логіку
    const result = await doDeepGiftAnalysis(job.id, job.payload);

    updateJob(jobId, { status: 'completed', result });
    await emitJobEvent(jobId, 'job.completed', { resultSummary: summarize(result) });
  } catch (err) {
    updateJob(jobId, {
      status: 'failed',
      error: (err as Error).message,
    });
    await emitJobEvent(jobId, 'job.failed', { error: 'Internal error' });
  }
}

І сам «циклічний» воркер, який можна запустити десь під час старту застосунку:

// openai/jobs/workerLoop.ts
import { queue } from './queue';
import { processJob } from './worker';

export function startWorkerLoop() {
  setInterval(async () => {
    const jobId = queue.shift(); // як слід, потрібен захист від гонок
    if (!jobId) return;

    await processJob(jobId);
  }, 1000); // раз на секунду перевіряємо чергу
}

Це навчальний приклад. У реальному світі замість setInterval буде повноцінна черга, яка сама «активуватиме» воркера, щойно з’являється нове повідомлення. Але загальна ідея добре видно: воркер відділено від MCP‑інструмента, він працює у тлі та спілкується з MCP‑сервером через події.

6. Генерація MCP‑подій із воркера

У попередніх лекціях ви вже бачили формат MCP‑подій: тип (type), унікальний event_id, timestamp, job_id і payload. Тепер покажемо, як воркер може викликати допоміжну функцію emitJobEvent, а та — передавати події до ChatGPT через SSE‑канал MCP‑сервера.

Приклад простої допоміжної функції:

// openai/jobs/events.ts
import { randomUUID } from 'crypto';
import { sendMcpEvent } from '../mcp/eventBus';

export async function emitJobEvent(
  jobId: string,
  type: 'job.started' | 'job.progress' | 'job.completed' | 'job.failed',
  payload: unknown
) {
  const event = {
    event_id: randomUUID(),
    type,
    job_id: jobId,
    timestamp: new Date().toISOString(),
    payload,
  };

  await sendMcpEvent(event);
}

А sendMcpEvent усередині MCP‑сервера вже знає, як саме передати цю подію в SSEServerTransport із MCP SDK: наприклад, через локальну шину подій або Redis Pub/Sub, як ми розбирали в модулі 12.

Ключова думка: воркер не спілкується з ChatGPT напряму. Він спілкується з MCP‑сервером, а той тримає SSE‑з’єднання та пересилає події клієнтам.

7. Прогрес і часткові результати з воркера

Тепер до найцікавішого: прогрес і часткові результати. У GiftGenius тривалий аналіз можна розбити на етапи:

  • збір і нормалізація даних;
  • побудова базових сегментів;
  • генерація первинних ідей подарунків;
  • фінальне ранжування й текстове пояснення.

На кожному етапі ми можемо надсилати job.progress і час від часу — job.partial, щоб UI вже показував перші подарунки.

Умовний воркер:

async function doDeepGiftAnalysis(jobId: string, payload: GiftJob['payload']) {
  await emitJobEvent(jobId, 'job.progress', { step: 1, totalSteps: 4 });

  const normalized = await collectAndNormalizeData(payload);
  await emitJobEvent(jobId, 'job.progress', { step: 2, totalSteps: 4 });

  const roughGifts = await generateInitialGifts(normalized);
  await emitJobEvent(jobId, 'job.partial', { gifts: roughGifts.slice(0, 3) });

  await emitJobEvent(jobId, 'job.progress', { step: 3, totalSteps: 4 });

  const finalGifts = await rerankAndBeautify(roughGifts);
  await emitJobEvent(jobId, 'job.progress', { step: 4, totalSteps: 4 });

  return finalGifts;
}

Віджет, слухаючи події, може спочатку показати 3 «чернеткові» подарунки з позначкою «Ще уточнюємо», а після job.completed — оновити список і прибрати індикатор завантаження. Усе це ідеально лягає на UX‑патерни, про які ми говорили в лекції 3.

8. Логіка retry для воркерів

Тепер найчутливіше місце: помилки й повторні спроби.

Уявіть, що воркер під час обробки завдання звертається до зовнішнього API списку товарів, а той періодично відповідає 500 або 429. Кидати завдання після першої ж помилки — дивно. Але й перезапускати без кінця не можна: так ви влаштуєте DDoS і собі, і сторонньому сервісу.

Нам потрібна стратегія retry з експоненційною затримкою та обмеженням кількості спроб.

Почнемо з класифікації помилок, яка знадобиться нам і далі в курсі:

  • тимчасові (transient) — тайм‑аути, 500, 503, 429;
  • постійні (permanent) — некоректний вхід, неіснуючий ресурс;
  • фатальні (bug) — помилки в коді, TypeError, неочікуваний виняток.

Повторювати має сенс лише тимчасові помилки. Решту потрібно чесно позначати як 'failed'.

Для простоти зробімо допоміжну функцію:

// openai/jobs/retry.ts
export function shouldRetry(error: unknown): boolean {
  if (!(error instanceof Error)) return false;
  // Умовно: HTTP 5xx або 429
  return /5\d\d|429/.test(error.message);
}

export function getDelayMs(base: number, attempt: number): number {
  const jitter = Math.random() * 100;   // невеликий шум
  return base * 2 ** attempt + jitter; // експоненційний backoff
}

Тепер оновимо воркер так, щоб він враховував attempts у GiftJob:

// openai/jobs/worker.ts
import { getJob, updateJob } from './queue';
import { emitJobEvent } from './events';
import { shouldRetry, getDelayMs } from './retry';

const MAX_ATTEMPTS = 5;

export async function processJob(jobId: string) {
  const job = getJob(jobId);
  if (!job) return;

  updateJob(jobId, { status: 'in_progress' });

  try {
    const result = await doDeepGiftAnalysis(job.id, job.payload);

    updateJob(jobId, { status: 'completed', result });
    await emitJobEvent(jobId, 'job.completed', {
      resultSummary: summarize(result),
    });
  } catch (err) {
    const attempts = job.attempts + 1;
    const error = err as Error;

    if (attempts <= MAX_ATTEMPTS && shouldRetry(error)) {
      const delay = getDelayMs(1000, attempts); // 1s,2s,4s...

      updateJob(jobId, { attempts, status: 'pending', error: error.message });

      setTimeout(() => {
        // У реальній черзі ви б повторно додали job у чергу із затримкою
        processJob(jobId);
      }, delay);

      await emitJobEvent(jobId, 'job.progress', {
        retry: attempts,
        nextAttemptInMs: delay,
      });
    } else {
      updateJob(jobId, { status: 'failed', error: error.message });
      await emitJobEvent(jobId, 'job.failed', {
        error: 'Не вдалося завершити аналіз після кількох спроб',
      });
    }
  }
}

Тут важливо кілька моментів.

По‑перше, attempts зберігається в самому завданні — це зручно і для логування, і для спостережності. (На графіку буде добре видно, скільки завдань виконуються з повторними спробами.)

По‑друге, під час кожної повторної спроби ми надсилаємо job.progress із явним зазначенням, що це спроба № N. Модель може використати цю інформацію, щоб пояснити користувачеві: «сервер подарунків відповідає нестабільно, пробую ще раз».

По‑третє, ми гарантуємо, що в будь‑якому разі буде надіслано або job.completed, або job.failed. Жодних підвішених завдань «ні живі, ні мертві».

Скасування ('canceled') — ще один важливий статус. У навчальних прикладах ми його не реалізуємо, але в продакшні його зазвичай виставляють з ініціативи користувача (кнопка «Скасувати» у віджеті) або за тайм‑аутом. У такому разі воркер під час наступного взяття завдання з черги бачить status: 'canceled', не запускає обробку, а MCP‑сервер надсилає фінальну подію job.canceled.

9. Ідемпотентність і retry: не наступаємо двічі на ті самі граблі

Коли ви додаєте retry, одразу з’являється ризик «зробити одне й те саме двічі». У комерційних модулях (commerce) це критично (наприклад, подвійне списання грошей), але й у GiftGenius теж є сценарії, де повтор — небажаний: надсилання двох однакових листів другові, дублювання запису у вашій внутрішній аналітиці тощо.

Тому варто орієнтуватися на два принципи.

Перший: обробник job має бути ідемпотентним.

Якщо ви викликаєте його з тим самим jobId кілька разів (у межах retry або помилково), світ не має ламатися. Для цього:

  • усі побічні ефекти (запис у БД, надсилання листів, створення замовлень) мають бути зав’язані на jobId або інший природний ідентифікатор, щоб у коді можна було швидко перевірити, чи виконували ми вже цей крок;
  • якщо job.status уже 'completed' або 'failed', повторний виклик можна або ігнорувати, або просто повернути вже готовий результат.

Приклад простого захисту:

export async function processJob(jobId: string) {
  const job = getJob(jobId);
  if (!job) return;

  if (job.status === 'completed' || job.status === 'failed') {
    // Завдання вже успішно або остаточно завершено
    return;
  }

  // ... решта коду
}

Другий: події теж мають бути ідемпотентними.

Ми вже говорили про event_id і те, що клієнт може фільтрувати дублікати, але на боці сервера теж варто бути уважним. Під час рестарту воркера або відновлення з черги не «засипайте» клієнта тими самими job.progress без потреби.

10. Де перебувають черги й воркери у вашій архітектурі

На схемі все красиво, але де фізично працює воркер? Є кілька типових варіантів.

Інтегрований воркер: MCP‑сервер і воркер — один і той самий процес/розгортання. Він же приймає tool‑calls, він же запускає worker loop. Плюс — у простоті: менше сервісів, простіше розгортати. Мінус — масштабування: щоб додати воркерів, доводиться масштабувати весь MCP‑сервер.

Виділений воркер: MCP‑сервер — один сервіс, воркери — інший. Між ними — черга і, можливо, Pub/Sub для подій. Це те, про що багато пишуть у контексті BullMQ/Redis і MCP‑подій: MCP‑сервер підписаний на Redis‑канал 'mcp:events', воркери публікують туди події.

Комбінований варіант: один екземпляр MCP‑сервера запускає і воркер, інші екземпляри — тільки HTTP/SSE. Це може бути корисним, якщо ви розгортаєтеся на Vercel або іншій serverless‑платформі, де постійні фонові процеси — не надто зручна історія.

У нашому навчальному GiftGenius поки що припустимий перший варіант: MCP‑сервер + один простий воркер у процесі. Коли ви дійдете до модулів про продакшн і масштабування, воркери можна буде мігрувати в окремий сервіс.

11. Приклад: повний async‑пайплайн GiftGenius

Давайте послідовно розберімо, що відбувається, коли користувач у чаті пише:

«Мені потрібен складний підбір подарунка для фаната космосу з урахуванням його минулих покупок».

  1. Модель вирішує викликати інструмент start_deep_analysis з параметрами профілю отримувача та бюджету.
  2. Інструмент створює GiftJob у БД зі статусом 'pending', кладе його в чергу й повертає jobId + повідомлення‑підтвердження.
  3. ChatGPT пояснює користувачеві, що аналіз запущено, і може передати jobId у віджет GiftGenius.
  4. Віджет підписується на події за цим jobId через SSE, показує прогрес‑бар і статус «Збираємо та аналізуємо дані».
  5. Воркер, побачивши новий job у черзі, оновлює статус на 'in_progress' і шле job.started.
  6. У процесі роботи він кілька разів надсилає job.progress (етапи) і job.partial (перші 23 подарунки).
  7. Якщо десь по дорозі «падає» зовнішнє API, воркер пробує ще раз з експоненційним backoff, оновлюючи attempts і надсилаючи подію з інформацією про повторну спробу.
  8. Наприкінці він або надсилає job.completed із коротким підсумком і фінальними рекомендаціями, або job.failed зі зрозумілим поясненням.
  9. На підставі цих подій віджет оновлює UI, а ChatGPT може сформувати текстову зведену відповідь і запропонувати наступні кроки: «Показати ще ідеї», «Звузити бюджет», «Змінити тип подарунка».

З погляду користувача це «живий» тривалий процес, яким можна керувати. З погляду бекенду — нормальний асинхронний пайплайн із чергою, воркерами та retry.

12. Невелика вправа (для самостійної практики)

Якщо хочете закріпити матеріал, спробуйте для GiftGenius:

  • придумати схему таблиці jobs для реальної БД: які індекси вам потрібні, які поля братимуть участь у фільтрації (за користувачем, за статусом, за датою створення);
  • накидати TypeScript‑тип для HTTP‑ендпойнта /api/jobs/:id, щоб віджет міг у крайньому разі робити опитування (polling) статусу, якщо SSE недоступний;
  • описати політику retry: скільки спроб, базова затримка, що робити із завданнями, які все одно не виконалися (проста dead‑letter‑таблиця або логування + алерт).

Це завдання вам знадобиться пізніше, коли в модулях про продакшн і спостережність ми говоритимемо про метрики на кшталт «скільки завдань зависло в статусі pending довше N хвилин».

13. Типові помилки під час роботи з асинхронними завданнями

Помилка №1: робити все синхронно в tool‑call.
Найпоширеніша пастка — спробувати втиснути всю важку роботу в один MCP‑інструмент без черги. Поки запитів мало, це ніби працює. Щойно навантаження зростає або зовнішні API починають гальмувати, ви ловите тайм‑аути, зависання чату й украй нервовий UX. Будь‑яку операцію, яка потенційно може тривати десятки секунд і довше, краще одразу проєктувати як async‑job із jobId.

Помилка №2: відсутність явної моделі Job.
Іноді розробники намагаються обійтися «просто повідомленнями в черзі», не зберігаючи стан завдань у БД. У підсумку важко відповісти на базові запитання: «який статус завдання?», «скільки разів ми намагалися його виконати?», «чому воно впало?». Чітка модель Job з полями status, attempts, error, createdAt — це основа для налагодження, моніторингу й UX.

Помилка №3: відсутність retry або, навпаки, нескінченні повтори.
Хтось узагалі не робить retry і «падає» при першому ж 500, а хтось пише while (!success) і не обмежує кількість спроб. У першому випадку ви втрачаєте купу завдань через короткочасні збої. У другому — створюєте собі бурі навантаження і ризикуєте заблокувати зовнішні API. Потрібна розумна середина: обмежена кількість спроб + експоненційна затримка + розділення тимчасових і постійних помилок.

Помилка №4: неідемпотентні обробники.
Якщо під час кожної спроби ви, наприклад, створюєте новий запис у сторонній системі без перевірки, виконуєте один і той самий платіж або надсилаєте один і той самий лист — retry швидко стає проблемою. Обробник має вміти зрозуміти, що завдання з цим jobId уже було успішно доведене до кінця, і не повторювати небезпечні побічні ефекти.

Помилка №5: відсутність подій при помилках.
Буває, що воркер падає з неочікуваним винятком, логує його в консоль — і на цьому все. Користувач сидить і без кінця чекає job.completed, не знаючи, що все вже давно «померло». Будь‑яка гілка, де обробка завершилася з помилкою, має зрештою призводити до job.failed і оновлення статусу Job у БД. Без цього ваші потоки MCP перетворюються на односторонній «чорний ящик».

Помилка №6: занадто часті події прогресу.
Бажання «бути чесним» і слати job.progress на кожен один відсоток виконання призводить до перевантаження мережі, клієнта й MCP‑сервера. Краще надсилати прогрес під час зміни етапу або суттєвого кроку (наприклад, кожні 10 %), а все інше зберігати лише у внутрішніх логах.

Помилка №7: використання черги в пам’яті в продакшні.
Навчальний приклад із queue: string[] і Map — хороший для розуміння архітектури, але в реальній продакшн‑системі він розвалиться при першому ж рестарті процесу або падінні сервера. Для серйозної експлуатації потрібні зовнішні черги та сховища: SQS, Pub/Sub, RabbitMQ, Redis Streams тощо. Варіанти «в пам’яті» годяться лише для локальної розробки й простих демо.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ