JavaRush /Курсы /Модуль 4: FastAPI /Асинхронная обработка задач через Celery в FastAPI

Асинхронная обработка задач через Celery в FastAPI

Модуль 4: FastAPI
13 уровень , 6 лекция
Открыта

Вы когда-нибудь замечали, как иногда при загрузке веб-страницы кнопка "Отправить" висит в ожидании?

Или как отправка электронной почты может занимать несколько секунд?

Если вы не хотите, чтобы пользователь ждал и скучал, асинхронная обработка задач — это ваш лучший друг.

Celery позволяет делегировать "тяжёлую работу" в фон, например:

  • Отправку писем.
  • Обработку изображений.
  • Выполнение сложных вычислений.
  • Сохранение данных в сторонние системы.

Наше приложение: что мы сделаем?

Мы создадим небольшой API на FastAPI, который принимает запрос на выполнение ресурсоёмкой задачи.

Вместо того чтобы выполнять её синхронно, мы отправим её в очередь Celery для асинхронной обработки, чтобы не блокировать основной поток.

Технический стек:

  • FastAPI для создания API.
  • Celery для фоновой обработки задач.
  • Redis в качестве брокера сообщений.

Шаг 1: установка необходимых зависимостей

Для начала убедитесь, что установлен Python 3.8+ и виртуальное окружение активировано. Установим Celery, FastAPI и Redis:

pip install fastapi uvicorn celery[redis]

Если вы забыли, что Redis — это наш "почтовый ящик" для сообщений между FastAPI и воркерами Celery, не переживайте. Мы его скоро настроим.


Шаг 2: Настройка Celery для FastAPI

Создадим структуру проекта:

my_async_app/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── tasks.py
│   └── celery_utils.py
├── celeryconfig.py
└── requirements.txt

Файл celeryconfig.py

Сначала опишем конфигурацию Celery в файле celeryconfig.py:


# celeryconfig.py

from celery import Celery

# Создаём приложение Celery
celery_app = Celery("my_async_app")

# Указываем брокер сообщений (Redis)
celery_app.conf.broker_url = "redis://localhost:6379/0"

# Хранилище результатов (опционально)
celery_app.conf.result_backend = "redis://localhost:6379/1"

# Формат сообщений (JSON — самое универсальное)
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]

Этот файл — сердце нашей системы задач. В нём мы указываем, где находится Redis, чтобы Celery мог общаться с ним.

Теперь определим наши задачи. Это файл, где мы опишем "рабочие процессы".


# app/tasks.py

from celery import Task
from time import sleep

from my_async_app.celeryconfig import celery_app

@celery_app.task(name="send_email_task")
def send_email(to_email: str):
    """
    Пример фоновой задачи: отправка письма.
    """
    sleep(5)  # Симуляция долгого процесса
    return {"status": "Email sent", "to": to_email}

Здесь мы создали простую задачу, которая симулирует отправку электронного письма. Вместо реальной отправки мы просто "засыпаем" на 5 секунд.


Шаг 3: интеграция Celery с FastAPI

Теперь добавим маршруты API для взаимодействия с Celery.

Файл main.py


# app/main.py

from fastapi import FastAPI, BackgroundTasks
from app.tasks import send_email
from celery.result import AsyncResult
from my_async_app.celeryconfig import celery_app

app = FastAPI()

@app.post("/send-email/")
async def send_email_endpoint(email: str):
    """
    Эндпоинт для создания задачи на отправку письма.
    """
    task = send_email.delay(email)  # Отправляем задачу в очередь Celery
    return {"task_id": task.id, "message": "Task has been submitted"}

@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
    """
    Эндпоинт для проверки статуса задачи.
    """
    task_result = AsyncResult(task_id, app=celery_app)
    
    if task_result.state == "PENDING":
        return {"task_id": task_id, "status": "Pending"}
    elif task_result.state == "SUCCESS":
        return {"task_id": task_id, "status": "Completed", "result": task_result.result}
    else:
        return {"task_id": task_id, "status": task_result.state}

В этом коде:

  • Мы создаём задачу с помощью delay() и возвращаем её id пользователю.
  • Пользователь может проверить статус задачи по task_id через другой эндпоинт.

Шаг 4: запуск приложения

  1. Сначала запустите Redis:
    redis-server
  2. Запустите воркер Celery:
    celery -A celeryconfig worker --loglevel=info
  3. Запустите FastAPI приложение:
    uvicorn app.main:app --reload

Шаг 5: Проверка работы системы

  1. Отправка задачи.
    В терминале или через Postman отправьте запрос:
    curl -X POST "http://127.0.0.1:8000/send-email/" -H "Content-Type: application/json" -d '{"email":"test@example.com"}'
    Вы получите ответ:
    {
      "task_id": "e54eaafb-1234-5678-9abc-ddef01234567",
      "message": "Task has been submitted"
    }
  2. Проверка статуса задачи.
    Теперь вы можете проверить статус с помощью полученного task_id:
    curl -X GET "http://127.0.0.1:8000/task-status/e54eaafb-1234-5678-9abc-ddef01234567"
    Пример ответа:
    {
      "task_id": "e54eaafb-1234-5678-9abc-ddef01234567",
      "status": "Completed",
      "result": {"status": "Email sent", "to": "test@example.com"}
    }

Типичные ошибки и их решение

  1. Celery не запускается?
    Проверьте, установлен ли Redis и запущен ли он. Также проверьте брокер URL в celeryconfig.py.
  2. Redis занята другим процессом?
    Если вы использовали одну и ту же базу Redis для брокера и результата (например, redis://localhost:6379/0), разделите их. Используйте разные индексы, например /0 и /1.
  3. Долгая отправка задачи?
    Убедитесь, что Celery воркер запущен и обрабатывает задачи. Проверьте лог воркера для ошибок.

Реальная польза и применение

Использование Celery для асинхронных задач — это отличное решение для веб-приложений, которым нужно обрабатывать длительные или критичные для производительности задачи. Спроектированное решение с FastAPI и Celery может стать основой для обработки больших объёмов данных, отправки уведомлений или взаимодействия с внешними API.

В следующей лекции мы расширим наши знания и рассмотрим планирование задач с Celery Beat — инструментом, который позволяет программировать периодические задания, например, задачи очистки или регулярные бэкапы.

1
Задача
Модуль 4: FastAPI, 13 уровень, 6 лекция
Недоступна
Создание и выполнение задачи в Celery
Создание и выполнение задачи в Celery
1
Задача
Модуль 4: FastAPI, 13 уровень, 6 лекция
Недоступна
Асинхронный API с Celery
Асинхронный API с Celery
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ