Ти колись помічав, як іноді при завантаженні веб‑сторінки кнопка "Відправити" зависає в очікуванні?
Або як відправка електронного листа може займати кілька секунд?
Якщо не хочеш, щоб користувач чекав і нудьгував, асинхронна обробка задач — це твій найкращий друг.
Celery дозволяє делегувати "важку роботу" у фон, наприклад:
- Відправку листів.
- Обробку зображень.
- Виконання складних обчислень.
- Збереження даних у сторонні системи.
Наш додаток: що ми зробимо?
Ми створимо невелике API на FastAPI, яке приймає запит на виконання ресурсоємної задачі.
Замість того, щоб виконувати її синхронно, ми відправимо її в чергу Celery для асинхронної обробки, щоб не блокувати основний потік.
Технічний стек:
- FastAPI для створення API.
- Celery для фонової обробки задач.
- Redis як брокер повідомлень.
Крок 1: встановлення необхідних залежностей
Спочатку переконайся, що встановлений Python 3.8+ і віртуальне оточення активоване. Встановимо Celery, FastAPI і Redis:
pip install fastapi uvicorn celery[redis]
Якщо ти забув, що Redis — це наша "поштова скринька" для повідомлень між FastAPI і воркерами Celery, не переживай. Ми його скоро налаштуємо.
Крок 2: Налаштування Celery для FastAPI
Створимо структуру проекту:
my_async_app/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── tasks.py
│ └── celery_utils.py
├── celeryconfig.py
└── requirements.txt
Файл celeryconfig.py
Спочатку опишемо конфігурацію Celery у файлі celeryconfig.py:
# celeryconfig.py
from celery import Celery
# Створюємо додаток Celery
celery_app = Celery("my_async_app")
# Вказуємо брокера повідомлень (Redis)
celery_app.conf.broker_url = "redis://localhost:6379/0"
# Сховище результатів (опціонально)
celery_app.conf.result_backend = "redis://localhost:6379/1"
# Формат повідомлень (JSON — найуніверсальніший)
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]
Цей файл — серце нашої системи задач. У ньому ми вказуємо, де знаходиться Redis, щоб Celery міг з ним спілкуватися.
Тепер визначимо наші задачі. Це файл, де опишемо "робочі процеси".
# app/tasks.py
from celery import Task
from time import sleep
from my_async_app.celeryconfig import celery_app
@celery_app.task(name="send_email_task")
def send_email(to_email: str):
"""
Приклад фонової задачі: відправлення листа.
"""
sleep(5) # Імітація довгого процесу
return {"status": "Email sent", "to": to_email}
Тут ми створили просту задачу, яка імітує відправку електронного листа. Замість реальної відправки ми просто "засинаємо" на 5 секунд.
Крок 3: інтеграція Celery з FastAPI
Тепер додамо маршрути API для взаємодії з Celery.
Файл main.py
# app/main.py
from fastapi import FastAPI, BackgroundTasks
from app.tasks import send_email
from celery.result import AsyncResult
from my_async_app.celeryconfig import celery_app
app = FastAPI()
@app.post("/send-email/")
async def send_email_endpoint(email: str):
"""
Ендпоінт для створення задачі на відправку листа.
"""
task = send_email.delay(email) # Відправляємо задачу в чергу Celery
return {"task_id": task.id, "message": "Task has been submitted"}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
"""
Ендпоінт для перевірки статусу задачі.
"""
task_result = AsyncResult(task_id, app=celery_app)
if task_result.state == "PENDING":
return {"task_id": task_id, "status": "Pending"}
elif task_result.state == "SUCCESS":
return {"task_id": task_id, "status": "Completed", "result": task_result.result}
else:
return {"task_id": task_id, "status": task_result.state}
У цьому коді:
- Ми створюємо задачу за допомогою
delay()і повертаємо їїidкористувачу. - Користувач може перевірити статус задачі по
task_idчерез інший ендпоінт.
Крок 4: запуск додатку
- Спочатку запусти Redis:
redis-server - Запусти воркер Celery:
celery -A celeryconfig worker --loglevel=info - Запусти FastAPI додаток:
uvicorn app.main:app --reload
Крок 5: Перевірка роботи системи
- Відправка задачі.
У терміналі або через Postman надішли запит:
Отримаєш відповідь:curl -X POST "http://127.0.0.1:8000/send-email/" -H "Content-Type: application/json" -d '{"email":"test@example.com"}'{ "task_id": "e54eaafb-1234-5678-9abc-ddef01234567", "message": "Task has been submitted" } - Перевірка статусу задачі.
Тепер можна перевірити статус за отриманимtask_id:
Приклад відповіді:curl -X GET "http://127.0.0.1:8000/task-status/e54eaafb-1234-5678-9abc-ddef01234567"{ "task_id": "e54eaafb-1234-5678-9abc-ddef01234567", "status": "Completed", "result": {"status": "Email sent", "to": "test@example.com"} }
Типові помилки та їх вирішення
- Celery не запускається?
Перевір, чи встановлений Redis і чи він запущений. Також перевір broker URL уceleryconfig.py. - Redis зайнятий іншим процесом?
Якщо ти використав одну і ту ж базу Redis для брокера і результатів (наприклад,redis://localhost:6379/0), розділи їх. Використай різні індекси, наприклад/0і/1. - Довга відправка задачі?
Переконайся, що Celery воркер запущений і обробляє задачі. Перевір лог воркера на помилки.
Реальна користь і застосування
Використання Celery для асинхронних задач — відмінне рішення для веб‑додатків, яким потрібно обробляти тривалі або критичні для продуктивності задачі. Спроєктоване рішення з FastAPI і Celery може стати основою для обробки великих обсягів даних, відправки нотифікацій або взаємодії з зовнішніми API.
У наступній лекції ми розширимо наші знання і розглянемо планування задач з Celery Beat — інструментом, який дозволяє програмувати періодичні завдання, наприклад, задачі очищення або регулярні бекапи.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ