Когда ваше приложение сталкивается с длительными вычислениями, такими как обработка изображений, отправка писем или взаимодействие с внешними API, блокировка основной веб-сессии приводит к замедлению работы системы и ухудшению пользовательского опыта. Это то же самое, как если бы вы пришли в кафе, сделали заказ, и официант ждал у плиты, пока ваше блюдо приготовится, игнорируя других клиентов.
Celery решает эту проблему. Он отделяет выполнение таких "тяжелых операций" в фоновые процессы, чтобы основное приложение продолжало свою работу, отвечая на запросы молниеносно.
Celery строится на трёх основных компонентах:
- Продюсер (Producer) — это ваше веб-приложение (например, FastAPI или Django), которое создаёт задачи и отправляет их в очередь. То есть оно говорит: "Вот задача, кто-нибудь, пожалуйста, займитесь этим".
- Брокер сообщений (Message Broker) — посредник, через которого передаются задачи. Это может быть Redis, RabbitMQ или другой инструмент. Он как почтовая служба: принимает задачи от продюсера и передаёт их воркерам.
- Воркеры (Workers) — фоновые процессы, которые получают задачи из очереди, выполняют их и (если нужно) возвращают результат. Они как исполнители, которые сидят и ждут: "Ну, давайте, дайте работу!"
Поток работы
graph TD
A[Клиент] --> B{"FastAPI / Django
(Producer)"};
B -- 1 Отправляет задачу task.delay() --> C((Broker
Redis / RabbitMQ));
C -- 2 Хранит задачу --> C;
C -- 3 Отдает задачу --> D[Celery Worker];
D -- 4 Выполняет задачу --> D;
D -- 5 Сохраняет результат (опц.) --> E((Result
Backend));
B -- 6 Запрашивает результат (опц.) --> E;
style D fill:#cfc,stroke:#333;
Ваше приложение (B) быстро отдает задачу брокеру (C) и отвечает клиенту. Воркер (D) в это время независимо выполняет тяжелую работу.
Создание и регистрация задач в Celery
Для создания задачи в Celery используется декоратор @celery.task. Этот декоратор говорит вашему приложению: "Эй, Celery, вот задача, которую я хочу выполнять в фоновом режиме!"
Пример для Django-проекта:
from celery import shared_task
@shared_task
def add(x, y):
return x + y
Пример для FastAPI:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def multiply(a, b):
return a * b
Теперь, вместо выполнения функции напрямую, задача отправляется в очередь, например:
# Django: вызываем задачу
add.delay(3, 7)
# FastAPI: отправляем задачу на исполнение
multiply.delay(4, 10)
Здесь .delay() — это метод, который отправляет задачу в очередь. Он не блокирует выполнение кода и не дожидается результата.
Как Celery обрабатывает задачи
Когда вы запускаете воркеры Celery, они подключаются к брокеру сообщений и подписываются на определенные очереди. Как только задача появляется в очереди, воркер берет её, выполняет и возвращает результат на сохранение (если так настроено).
Воркеры запускаются с помощью команды:
celery -A your_project worker --loglevel=info
-A your_projectуказывает имя вашего Python-проекта или Celery приложения.workerзапускает процесс, который будет обрабатывать задачи.--loglevel=infoпоказывает информацию о каждой задаче в консоли.
Простая демонстрация выполнения задачи
Допустим, у нас есть задача в tasks.py:
from celery import shared_task
@shared_task
def reverse_string(s):
return s[::-1]
Мы вызываем её из пользовательского кода (например, во время обработки HTTP-запроса):
# Django View
from .tasks import reverse_string
def my_view(request):
string_to_reverse = "Celery"
task = reverse_string.delay(string_to_reverse)
return HttpResponse(f"Task ID: {task.id}")
Воркеры подключаются к брокеру (Redis/RabbitMQ), получают задачу и выполняют её:
- Они реверсируют строку "Celery" на "yreleC".
- Результат можно сохранить, либо дождаться выполнения (это будет рассмотрено далее).
Управление задачами и их статусами
Когда задача выполнена, результат можно сохранить. Celery предоставляет настраиваемый механизм хранения (backends), например, вы можете использовать Redis или базу данных.
Пример подключения backend в конфигурационный файл celery.py:
from celery import Celery
app = Celery(
'my_project',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1', # Хранение результатов
)
Теперь посмотреть результат можно так:
result = reverse_string.delay("Celery")
print(result.get(timeout=10)) # Выведет: "yreleC"
Основные состояния задач
Celery использует систему состояний для отслеживания выполнения задач:
- PENDING: задача поставлена в очередь, но ещё не обработана воркером.
- STARTED: воркеры начали выполнение задачи.
- SUCCESS: задача успешно выполнена.
- FAILURE: задача завершилась с ошибкой.
- RETRY: задача была отправлена на повтор.
- REVOKED: задача была отменена.
Пример получения статуса задачи:
status = result.status
print(f"Status of the task: {status}")
Настройка результата выполнения задач
Иногда результат выполнения задачи лучше отключить (например, когда это не требуется). Это можно сделать через настройку CELERY_TASK_IGNORE_RESULT:
app.conf.update(
task_ignore_result=True
)
Потенциальные нюансы и типичные ошибки
При работе с Celery полезно учитывать несколько моментов. Например, задачи могут застревать в статусе PENDING, если воркеры не подключены или неправильно настроен брокер. Проверьте подключение воркеров командой celery -A my_project inspect active.
Ещё одна популярная ошибка — добавление сложных объектов (таких как файлы или не сериализуемые Python-объекты) в задачу. Celery требует, чтобы входные данные задач были сериализуемыми, например JSON.
Цели, которые мы рассмотрели сегодня, являются основывающимися на создаваемых и масштабируемых фоновых задачах. На следующей лекции мы будем интегрировать Celery, используя RabbitMQ, чтобы усилить нашу асинхронную обработку задач.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ