Введение
Сегодня мы реализуем пример полного цикла планирования, выполнения и оптимизации регулярных задач с использованием Celery и Celery Beat.
Для начала определимся, какую функциональность мы хотим добавить в наше приложение. Представим, что у нас есть приложение на FastAPI, которое отправляет ежедневные отчёты пользователям. Отправка таких отчётов должна быть полностью автоматизирована, и здесь нам поможет Celery Beat.
Определение задачи в Celery
Допустим, мы хотим ежедневно отправлять по электронной почте отчёты нашим пользователям. Для этого создадим Celery-задачу:
from celery import Celery
from datetime import datetime
# Создаем объект Celery
celery_app = Celery('tasks', broker='redis://localhost:6379/0')
@celery_app.task
def send_daily_report():
# Здесь мы симулируем отправку отчета
now = datetime.now()
print(f"[{now}] Отправка ежедневного отчета началась...")
# Добавьте вашу логику отправки данных тут
print(f"[{now}] Отчет успешно отправлен!")
Примечание: брокером у нас выступает Redis, о котором мы говорили ранее. Если вы используете RabbitMQ, просто замените broker='redis://localhost:6379/0' на соответствующую строку подключения.
Как только задача будет зарегистрирована и настроена, мы можем использовать Celery Beat для её периодического вызова.
Подключение Celery Beat
Celery Beat использует специальную таблицу в базе данных для хранения расписания задач. Прежде чем подключать Beat, убедимся, что соответствующий пакет установлен:
pip install django-celery-beat
Далее, настроим приложение. В Django это делается через добавление django_celery_beat в INSTALLED_APPS и выполнение миграций:
python manage.py migrate
Важный момент: в FastAPI интеграция Celery Beat делается через миграцию и настройку базы данных вручную, поскольку нет встроенного механизма для регистрации схем, как в Django. Подробности можно найти в документации django-celery-beat.
Регистрация задачи в Celery Beat
Теперь зарегистрируем нашу задачу в Celery Beat. В Django это можно сделать через панель администратора:
- Перейдите в
/admin/и убедитесь, что там виден раздел Periodic Tasks. - Создайте новую задачу (Periodic Task):
- Название:
send_daily_report - Регулярное выполнение: раз в день.
- Связь с существующей задачей Celery: укажите путь
tasks.send_daily_report.
- Название:
Всё, задача зарегистрирована! Celery Beat позаботится о том, чтобы она выполнялась в указанное время.
Обработка и выполнение регулярных задач
Теперь, когда наша задача зарегистрирована, пора настроить воркеры Celery для её выполнения. Запустим Celery-воркер и Beat:
celery -A tasks worker --loglevel=info
celery -A tasks beat --loglevel=info
Оба этих процесса нужно запустить параллельно. В реальных системах их можно развернуть как независимые службы (например, через systemd или Docker Compose).
Если всё сделано правильно, в консоли воркера вы увидите что-то вроде:
[2025-10-01 00:00:00,123: INFO/MainProcess] Отправка ежедневного отчета началась...
[2025-10-01 00:00:03,456: INFO/MainProcess] Отчет успешно отправлен!
Оптимизация работы задач
Даже с настроенной периодической задачей мы можем столкнуться с рядом вызовов. Например, задача может занять слишком много времени, или база данных для хранения расписаний может стать узким местом. Рассмотрим несколько подходов для оптимизации.
Использование очередей задач с приоритетами
Пусть задача отправки отчетов будет менее приоритетной, чем другие критические задачи. Мы можем настроить отдельную очередь для низкоприоритетных задач. Для этого добавим в celery_app определения очередей:
from kombu import Exchange, Queue
celery_app.conf.task_queues = (
Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
)
celery_app.conf.task_default_queue = 'default'
Теперь укажем нашу задачу в "низкоприоритетную" очередь:
@celery_app.task(queue='low_priority')
def send_daily_report():
# Логика задачи остаётся прежней
pass
Мониторинг задач
Для мониторинга задач используется Celery Flower. Убедитесь, что он установлен:
pip install flower
Запустите сервер Flower:
celery -A tasks flower --port=5555
Перейдите в браузере на http://localhost:5555, чтобы увидеть панели мониторинга задач. Здесь вы сможете увидеть все активные задачи, их статус, а также историю выполнения.
Балансировка нагрузки
Если одна машина не справляется с нагрузкой, вы можете распределить выполнение задач между несколькими воркерами. Просто запустите несколько воркеров на разных машинах (или контейнерах):
celery -A tasks worker --loglevel=info --hostname=worker1@%h
celery -A tasks worker --loglevel=info --hostname=worker2@%h
Практический пример настройки задачи
Давайте соберём всё, что мы узнали, в единый пример.
- Подключим Celery и Celery Beat к проекту на FastAPI.
- Создадим задачу отправки отчетов.
- Настроим её выполнение раз в день.
- Оптимизируем систему с помощью низкоприоритетной очереди и мониторинга через flower.
После этого наше приложение будет полностью готово к выполнению регулярных задач!
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ