Вы когда-нибудь замечали, как иногда при загрузке веб-страницы кнопка "Отправить" висит в ожидании?
Или как отправка электронной почты может занимать несколько секунд?
Если вы не хотите, чтобы пользователь ждал и скучал, асинхронная обработка задач — это ваш лучший друг.
Celery позволяет делегировать "тяжёлую работу" в фон, например:
- Отправку писем.
- Обработку изображений.
- Выполнение сложных вычислений.
- Сохранение данных в сторонние системы.
Наше приложение: что мы сделаем?
Мы создадим небольшой API на FastAPI, который принимает запрос на выполнение ресурсоёмкой задачи.
Вместо того чтобы выполнять её синхронно, мы отправим её в очередь Celery для асинхронной обработки, чтобы не блокировать основной поток.
Технический стек:
- FastAPI для создания API.
- Celery для фоновой обработки задач.
- Redis в качестве брокера сообщений.
Шаг 1: установка необходимых зависимостей
Для начала убедитесь, что установлен Python 3.8+ и виртуальное окружение активировано. Установим Celery, FastAPI и Redis:
pip install fastapi uvicorn celery[redis]
Если вы забыли, что Redis — это наш "почтовый ящик" для сообщений между FastAPI и воркерами Celery, не переживайте. Мы его скоро настроим.
Шаг 2: Настройка Celery для FastAPI
Создадим структуру проекта:
my_async_app/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── tasks.py
│ └── celery_utils.py
├── celeryconfig.py
└── requirements.txt
Файл celeryconfig.py
Сначала опишем конфигурацию Celery в файле celeryconfig.py:
# celeryconfig.py
from celery import Celery
# Создаём приложение Celery
celery_app = Celery("my_async_app")
# Указываем брокер сообщений (Redis)
celery_app.conf.broker_url = "redis://localhost:6379/0"
# Хранилище результатов (опционально)
celery_app.conf.result_backend = "redis://localhost:6379/1"
# Формат сообщений (JSON — самое универсальное)
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]
Этот файл — сердце нашей системы задач. В нём мы указываем, где находится Redis, чтобы Celery мог общаться с ним.
Теперь определим наши задачи. Это файл, где мы опишем "рабочие процессы".
# app/tasks.py
from celery import Task
from time import sleep
from my_async_app.celeryconfig import celery_app
@celery_app.task(name="send_email_task")
def send_email(to_email: str):
"""
Пример фоновой задачи: отправка письма.
"""
sleep(5) # Симуляция долгого процесса
return {"status": "Email sent", "to": to_email}
Здесь мы создали простую задачу, которая симулирует отправку электронного письма. Вместо реальной отправки мы просто "засыпаем" на 5 секунд.
Шаг 3: интеграция Celery с FastAPI
Теперь добавим маршруты API для взаимодействия с Celery.
Файл main.py
# app/main.py
from fastapi import FastAPI, BackgroundTasks
from app.tasks import send_email
from celery.result import AsyncResult
from my_async_app.celeryconfig import celery_app
app = FastAPI()
@app.post("/send-email/")
async def send_email_endpoint(email: str):
"""
Эндпоинт для создания задачи на отправку письма.
"""
task = send_email.delay(email) # Отправляем задачу в очередь Celery
return {"task_id": task.id, "message": "Task has been submitted"}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
"""
Эндпоинт для проверки статуса задачи.
"""
task_result = AsyncResult(task_id, app=celery_app)
if task_result.state == "PENDING":
return {"task_id": task_id, "status": "Pending"}
elif task_result.state == "SUCCESS":
return {"task_id": task_id, "status": "Completed", "result": task_result.result}
else:
return {"task_id": task_id, "status": task_result.state}
В этом коде:
- Мы создаём задачу с помощью
delay()и возвращаем еёidпользователю. - Пользователь может проверить статус задачи по
task_idчерез другой эндпоинт.
Шаг 4: запуск приложения
- Сначала запустите Redis:
redis-server - Запустите воркер Celery:
celery -A celeryconfig worker --loglevel=info - Запустите FastAPI приложение:
uvicorn app.main:app --reload
Шаг 5: Проверка работы системы
- Отправка задачи.
В терминале или через Postman отправьте запрос:
Вы получите ответ:curl -X POST "http://127.0.0.1:8000/send-email/" -H "Content-Type: application/json" -d '{"email":"test@example.com"}'{ "task_id": "e54eaafb-1234-5678-9abc-ddef01234567", "message": "Task has been submitted" } - Проверка статуса задачи.
Теперь вы можете проверить статус с помощью полученногоtask_id:
Пример ответа:curl -X GET "http://127.0.0.1:8000/task-status/e54eaafb-1234-5678-9abc-ddef01234567"{ "task_id": "e54eaafb-1234-5678-9abc-ddef01234567", "status": "Completed", "result": {"status": "Email sent", "to": "test@example.com"} }
Типичные ошибки и их решение
- Celery не запускается?
Проверьте, установлен ли Redis и запущен ли он. Также проверьте брокер URL вceleryconfig.py. - Redis занята другим процессом?
Если вы использовали одну и ту же базу Redis для брокера и результата (например,redis://localhost:6379/0), разделите их. Используйте разные индексы, например/0и/1. - Долгая отправка задачи?
Убедитесь, что Celery воркер запущен и обрабатывает задачи. Проверьте лог воркера для ошибок.
Реальная польза и применение
Использование Celery для асинхронных задач — это отличное решение для веб-приложений, которым нужно обрабатывать длительные или критичные для производительности задачи. Спроектированное решение с FastAPI и Celery может стать основой для обработки больших объёмов данных, отправки уведомлений или взаимодействия с внешними API.
В следующей лекции мы расширим наши знания и рассмотрим планирование задач с Celery Beat — инструментом, который позволяет программировать периодические задания, например, задачи очистки или регулярные бэкапы.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ