JavaRush /Курси /Модуль 4: FastAPI /Асинхронна обробка задач через Celery у FastAPI

Асинхронна обробка задач через Celery у FastAPI

Модуль 4: FastAPI
Рівень 13 , Лекція 6
Відкрита

Ти колись помічав, як іноді при завантаженні веб‑сторінки кнопка "Відправити" зависає в очікуванні?

Або як відправка електронного листа може займати кілька секунд?

Якщо не хочеш, щоб користувач чекав і нудьгував, асинхронна обробка задач — це твій найкращий друг.

Celery дозволяє делегувати "важку роботу" у фон, наприклад:

  • Відправку листів.
  • Обробку зображень.
  • Виконання складних обчислень.
  • Збереження даних у сторонні системи.

Наш додаток: що ми зробимо?

Ми створимо невелике API на FastAPI, яке приймає запит на виконання ресурсоємної задачі.

Замість того, щоб виконувати її синхронно, ми відправимо її в чергу Celery для асинхронної обробки, щоб не блокувати основний потік.

Технічний стек:

  • FastAPI для створення API.
  • Celery для фонової обробки задач.
  • Redis як брокер повідомлень.

Крок 1: встановлення необхідних залежностей

Спочатку переконайся, що встановлений Python 3.8+ і віртуальне оточення активоване. Встановимо Celery, FastAPI і Redis:

pip install fastapi uvicorn celery[redis]

Якщо ти забув, що Redis — це наша "поштова скринька" для повідомлень між FastAPI і воркерами Celery, не переживай. Ми його скоро налаштуємо.


Крок 2: Налаштування Celery для FastAPI

Створимо структуру проекту:

my_async_app/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── tasks.py
│   └── celery_utils.py
├── celeryconfig.py
└── requirements.txt

Файл celeryconfig.py

Спочатку опишемо конфігурацію Celery у файлі celeryconfig.py:


# celeryconfig.py

from celery import Celery

# Створюємо додаток Celery
celery_app = Celery("my_async_app")

# Вказуємо брокера повідомлень (Redis)
celery_app.conf.broker_url = "redis://localhost:6379/0"

# Сховище результатів (опціонально)
celery_app.conf.result_backend = "redis://localhost:6379/1"

# Формат повідомлень (JSON — найуніверсальніший)
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]

Цей файл — серце нашої системи задач. У ньому ми вказуємо, де знаходиться Redis, щоб Celery міг з ним спілкуватися.

Тепер визначимо наші задачі. Це файл, де опишемо "робочі процеси".


# app/tasks.py

from celery import Task
from time import sleep

from my_async_app.celeryconfig import celery_app

@celery_app.task(name="send_email_task")
def send_email(to_email: str):
    """
    Приклад фонової задачі: відправлення листа.
    """
    sleep(5)  # Імітація довгого процесу
    return {"status": "Email sent", "to": to_email}

Тут ми створили просту задачу, яка імітує відправку електронного листа. Замість реальної відправки ми просто "засинаємо" на 5 секунд.


Крок 3: інтеграція Celery з FastAPI

Тепер додамо маршрути API для взаємодії з Celery.

Файл main.py


# app/main.py

from fastapi import FastAPI, BackgroundTasks
from app.tasks import send_email
from celery.result import AsyncResult
from my_async_app.celeryconfig import celery_app

app = FastAPI()

@app.post("/send-email/")
async def send_email_endpoint(email: str):
    """
    Ендпоінт для створення задачі на відправку листа.
    """
    task = send_email.delay(email)  # Відправляємо задачу в чергу Celery
    return {"task_id": task.id, "message": "Task has been submitted"}

@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
    """
    Ендпоінт для перевірки статусу задачі.
    """
    task_result = AsyncResult(task_id, app=celery_app)
    
    if task_result.state == "PENDING":
        return {"task_id": task_id, "status": "Pending"}
    elif task_result.state == "SUCCESS":
        return {"task_id": task_id, "status": "Completed", "result": task_result.result}
    else:
        return {"task_id": task_id, "status": task_result.state}

У цьому коді:

  • Ми створюємо задачу за допомогою delay() і повертаємо її id користувачу.
  • Користувач може перевірити статус задачі по task_id через інший ендпоінт.

Крок 4: запуск додатку

  1. Спочатку запусти Redis:
    redis-server
  2. Запусти воркер Celery:
    celery -A celeryconfig worker --loglevel=info
  3. Запусти FastAPI додаток:
    uvicorn app.main:app --reload

Крок 5: Перевірка роботи системи

  1. Відправка задачі.
    У терміналі або через Postman надішли запит:
    curl -X POST "http://127.0.0.1:8000/send-email/" -H "Content-Type: application/json" -d '{"email":"test@example.com"}'
    Отримаєш відповідь:
    {
      "task_id": "e54eaafb-1234-5678-9abc-ddef01234567",
      "message": "Task has been submitted"
    }
  2. Перевірка статусу задачі.
    Тепер можна перевірити статус за отриманим task_id:
    curl -X GET "http://127.0.0.1:8000/task-status/e54eaafb-1234-5678-9abc-ddef01234567"
    Приклад відповіді:
    {
      "task_id": "e54eaafb-1234-5678-9abc-ddef01234567",
      "status": "Completed",
      "result": {"status": "Email sent", "to": "test@example.com"}
    }

Типові помилки та їх вирішення

  1. Celery не запускається?
    Перевір, чи встановлений Redis і чи він запущений. Також перевір broker URL у celeryconfig.py.
  2. Redis зайнятий іншим процесом?
    Якщо ти використав одну і ту ж базу Redis для брокера і результатів (наприклад, redis://localhost:6379/0), розділи їх. Використай різні індекси, наприклад /0 і /1.
  3. Довга відправка задачі?
    Переконайся, що Celery воркер запущений і обробляє задачі. Перевір лог воркера на помилки.

Реальна користь і застосування

Використання Celery для асинхронних задач — відмінне рішення для веб‑додатків, яким потрібно обробляти тривалі або критичні для продуктивності задачі. Спроєктоване рішення з FastAPI і Celery може стати основою для обробки великих обсягів даних, відправки нотифікацій або взаємодії з зовнішніми API.

У наступній лекції ми розширимо наші знання і розглянемо планування задач з Celery Beat — інструментом, який дозволяє програмувати періодичні завдання, наприклад, задачі очищення або регулярні бекапи.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ