JavaRush /Курсы /Модуль 4: FastAPI /Пример планирования и выполнения регулярных задач с Celer...

Пример планирования и выполнения регулярных задач с Celery

Модуль 4: FastAPI
13 уровень , 9 лекция
Открыта

Введение

Сегодня мы реализуем пример полного цикла планирования, выполнения и оптимизации регулярных задач с использованием Celery и Celery Beat.

Для начала определимся, какую функциональность мы хотим добавить в наше приложение. Представим, что у нас есть приложение на FastAPI, которое отправляет ежедневные отчёты пользователям. Отправка таких отчётов должна быть полностью автоматизирована, и здесь нам поможет Celery Beat.

Определение задачи в Celery

Допустим, мы хотим ежедневно отправлять по электронной почте отчёты нашим пользователям. Для этого создадим Celery-задачу:


from celery import Celery
from datetime import datetime

# Создаем объект Celery
celery_app = Celery('tasks', broker='redis://localhost:6379/0')

@celery_app.task
def send_daily_report():
    # Здесь мы симулируем отправку отчета
    now = datetime.now()
    print(f"[{now}] Отправка ежедневного отчета началась...")
    # Добавьте вашу логику отправки данных тут
    print(f"[{now}] Отчет успешно отправлен!")

Примечание: брокером у нас выступает Redis, о котором мы говорили ранее. Если вы используете RabbitMQ, просто замените broker='redis://localhost:6379/0' на соответствующую строку подключения.

Как только задача будет зарегистрирована и настроена, мы можем использовать Celery Beat для её периодического вызова.

Подключение Celery Beat

Celery Beat использует специальную таблицу в базе данных для хранения расписания задач. Прежде чем подключать Beat, убедимся, что соответствующий пакет установлен:


pip install django-celery-beat

Далее, настроим приложение. В Django это делается через добавление django_celery_beat в INSTALLED_APPS и выполнение миграций:


python manage.py migrate

Важный момент: в FastAPI интеграция Celery Beat делается через миграцию и настройку базы данных вручную, поскольку нет встроенного механизма для регистрации схем, как в Django. Подробности можно найти в документации django-celery-beat.

Регистрация задачи в Celery Beat

Теперь зарегистрируем нашу задачу в Celery Beat. В Django это можно сделать через панель администратора:

  1. Перейдите в /admin/ и убедитесь, что там виден раздел Periodic Tasks.
  2. Создайте новую задачу (Periodic Task):
    • Название: send_daily_report
    • Регулярное выполнение: раз в день.
    • Связь с существующей задачей Celery: укажите путь tasks.send_daily_report.

Всё, задача зарегистрирована! Celery Beat позаботится о том, чтобы она выполнялась в указанное время.


Обработка и выполнение регулярных задач

Теперь, когда наша задача зарегистрирована, пора настроить воркеры Celery для её выполнения. Запустим Celery-воркер и Beat:


celery -A tasks worker --loglevel=info
celery -A tasks beat --loglevel=info

Оба этих процесса нужно запустить параллельно. В реальных системах их можно развернуть как независимые службы (например, через systemd или Docker Compose).

Если всё сделано правильно, в консоли воркера вы увидите что-то вроде:


[2025-10-01 00:00:00,123: INFO/MainProcess] Отправка ежедневного отчета началась...
[2025-10-01 00:00:03,456: INFO/MainProcess] Отчет успешно отправлен!

Оптимизация работы задач

Даже с настроенной периодической задачей мы можем столкнуться с рядом вызовов. Например, задача может занять слишком много времени, или база данных для хранения расписаний может стать узким местом. Рассмотрим несколько подходов для оптимизации.

Использование очередей задач с приоритетами

Пусть задача отправки отчетов будет менее приоритетной, чем другие критические задачи. Мы можем настроить отдельную очередь для низкоприоритетных задач. Для этого добавим в celery_app определения очередей:


from kombu import Exchange, Queue

celery_app.conf.task_queues = (
    Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
)

celery_app.conf.task_default_queue = 'default'

Теперь укажем нашу задачу в "низкоприоритетную" очередь:


@celery_app.task(queue='low_priority')
def send_daily_report():
    # Логика задачи остаётся прежней
    pass

Мониторинг задач

Для мониторинга задач используется Celery Flower. Убедитесь, что он установлен:


pip install flower

Запустите сервер Flower:


celery -A tasks flower --port=5555

Перейдите в браузере на http://localhost:5555, чтобы увидеть панели мониторинга задач. Здесь вы сможете увидеть все активные задачи, их статус, а также историю выполнения.

Балансировка нагрузки

Если одна машина не справляется с нагрузкой, вы можете распределить выполнение задач между несколькими воркерами. Просто запустите несколько воркеров на разных машинах (или контейнерах):


celery -A tasks worker --loglevel=info --hostname=worker1@%h
celery -A tasks worker --loglevel=info --hostname=worker2@%h

Практический пример настройки задачи

Давайте соберём всё, что мы узнали, в единый пример.

  1. Подключим Celery и Celery Beat к проекту на FastAPI.
  2. Создадим задачу отправки отчетов.
  3. Настроим её выполнение раз в день.
  4. Оптимизируем систему с помощью низкоприоритетной очереди и мониторинга через flower.

После этого наше приложение будет полностью готово к выполнению регулярных задач!

1
Задача
Модуль 4: FastAPI, 13 уровень, 9 лекция
Недоступна
Регулярная отправка сообщений
Регулярная отправка сообщений
1
Задача
Модуль 4: FastAPI, 13 уровень, 9 лекция
Недоступна
Параллельное выполнение регулярных задач
Параллельное выполнение регулярных задач
3
Опрос
Использование Celery Beat, 13 уровень, 9 лекция
Недоступен
Использование Celery Beat
Использование Celery Beat для планирования регулярных задач
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ