В Celery задачи — это функции, которые вы аннотируете специальным декоратором @celery.task.
Вот пример самой простой задачи, которая возвращает сумму двух чисел:
# tasks.py
from celery import Celery
app = Celery('example_app', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
В этом примере:
- Мы импортировали Celery и создали экземпляр приложения через
Celery('example_app'), указав брокера сообщений (Redis). - Мы определили функцию
add(x, y)и зарегистрировали её как задачу с помощью декоратора@app.task.
Не забывайте структурировать проект! Обычно создаётся отдельный файл tasks.py, где и размещаются функции задач.
Хорошая структура = меньше головной боли.
Выполнение задач
Чтобы направить задачу на выполнение (то есть отправить её в Celery-очередь), используйте метод .delay():
# Отправляем задачу на выполнение
result = add.delay(4, 6)
# Результат выполнения *можно* получить через result.get() (если нужно)
print(result.get()) # Вывод: 10
Обратите внимание, что вызов add.delay() не блокирует выполнение кода. Задача создаётся и отправляется в очередь, а Celery её обрабатывает независимо.
Выполнение задач с приоритетами
Celery позволяет задавать приоритет задачам, чтобы важные задачи обрабатывались раньше остальных. Вот как это делается:
- Настраиваем приоритеты очередей в конфигурации Celery:
# celeryconfig.py task_queues = { # Указываем имя очереди и уровни приоритета 'default': {'exchange': 'default', 'routing_key': 'default', 'priority_levels': 10}, } - Передаём приоритет при вызове задачи:
# Задача с высоким приоритетом add.apply_async((5, 7), priority=9)Здесь
priority=9указывает на высокий приоритет (чем выше число, тем выше приоритет).
Работа с результатами задач
Когда наши задачи выполняются, их результаты можно сохранять, обрабатывать или игнорировать.
Celery предоставляет механизм хранения результатов через backend (например, Redis). Вот пример:
Включим функцию сохранения результатов через Redis:
# celeryconfig.py
result_backend = 'redis://localhost:6379/1'
В этом случае результаты выполнения задач сохраняются в Redis (в базе 1). Теперь можно получить результат выполнения:
result = add.delay(10, 20)
# Ожидаем выполнение и получаем результат
print(result.get(timeout=10)) # Вывод: 30
Вызов result.get() блокирует выполнение кода до завершения задачи, что противоречит принципу асинхронности.
Используйте этот подход только если это необходимо.
Практический пример: отправка email
Давайте реализуем классический сценарий: отправка подтверждения о регистрации пользователя.
Мы создадим задачу, которая отправляет email асинхронно.
- Убедимся, что у нас установлен SMTP-сервер. Для тестов можно использовать MailHog.
- Создадим задачу в
tasks.py:# tasks.py from celery import Celery from time import sleep app = Celery('email_app', broker='redis://localhost:6379/0') @app.task def send_email(email, subject, body): """ Пример эмуляции отправки письма (вместо реального SMTP поработаем с 'print') """ sleep(2) # Имитируем задержку отправки print(f"Сообщение отправлено: {email}, {subject}, {body}") return f"Email отправлен для {email}"Задача
send_emailэмулирует отправку письма. - Вызываем задачу из нашего приложения:
# Где-то в вашем коде (например, во view или endpoint) from tasks import send_email # Отправляем email в фоновую задачу result = send_email.delay( email="user@example.com", subject="Добро пожаловать!", body="Спасибо за регистрацию на нашем сайте!" ) # Можно также получить результат, если это нужно print(result.get()) # Вывод: "Email отправлен для user@example.com"Этот пример показывает, как легко связать бизнес-логику приложения с Celery, разгружая главный процесс. Асинхронность FTW (for the win)!
Ключевые ошибки при работе с Celery
- Неправильная конфигурация брокера сообщений. Если вы забыли настроить правильный адрес брокера (
broker_url), задачи просто "исчезнут" в небытии. Всегда проверяйте конфигурацию. - Ожидание
result.get()для длительных задач. Цель Celery — асинхронность. Не блокируйте приложение этим вызовом, если в этом нет крайней необходимости. - Отсутствие мониторинга. Без инструментов вроде Flower или логов сложно заметить, если что-то пошло не так.
Усложняем задачи
- Давайте добавим задачу с расчетом факториала, чтобы продемонстрировать вычисление:
@app.task def calculate_factorial(n): result = 1 for i in range(1, n + 1): result *= i sleep(0.1) # Замедляем для эмуляции долгой операции return result - Используем её:
# Запускаем задачу factorial_result = calculate_factorial.delay(10) # Работаем дальше параллельно... (ждем, но не блокируем!) print("Фоновая задача запущена!")
Теперь ваш Celery умеет делать полезные штуки: от отправки писем до вычислений.
В следующих лекциях вы сможете улучшить этот функционал, добавив мониторинг и планирование задач с Celery Beat.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ