JavaRush /Курсы /Модуль 4: FastAPI /Использование Celery Beat для планирования регулярных зад...

Использование Celery Beat для планирования регулярных задач

Модуль 4: FastAPI
13 уровень , 7 лекция
Открыта

В этой лекции мы с вами поговорим про Celery Beat — расширение для Celery, которое позволяет с легкостью создавать и планировать регулярные задачи. Если вы хоть раз задумывались, как автоматизировать выполнение задач в вашем приложении по расписанию, будь то еженедельная отправка писем, удаление устаревших данных или регулярное обновление отчётов, то вы попали по адресу.

Celery Beat — это дополнительный инструмент для планирования задач в Celery. Он работает как "будильник" для ваших задач, напоминая системе, что пора выполнять определённую задачу в указанное время. Например, если вы хотите запускать задачу каждые 5 минут или каждый понедельник в 8 утра, Celery Beat справится с этим идеально.

Преимущества использования Celery Beat

  • Простота настройки расписания (избавляет от необходимости писать сложные планировщики).
  • Полная интеграция с Celery, что означает, что ваши задачи уже готовы для использования.
  • Возможность динамического изменения расписания без необходимости перезапускать приложение.

Как это работает?

Celery Beat хранит расписание выполнения задач в базе данных, читает его и отправляет задачи в Celery в соответствии с указанным временем.

Архитектурно это выглядит так:


Celery Beat Scheduler (читаем расписание из БД) --> Отправляем задачи в брокер (RabbitMQ/Redis) --> Celery Worker выполняет задачи.

Установка и настройка Celery Beat

Прежде чем наслаждаться магией Celery Beat, его нужно установить и настроить.

Celery Beat можно установить командой:

pip install django-celery-beat

Если вы работаете с FastAPI, подход тот же. Однако для Django это расширение изначально адаптировано, поэтому многие готовые функции будут работать "из коробки".

  1. Добавляем приложение Celery Beat:
    В settings.py добавляем django_celery_beat в список INSTALLED_APPS:
    
    INSTALLED_APPS = [
        ...
        'django_celery_beat',
    ]
    
  2. Применяем миграции:
    После добавления приложения, необходимо применить миграции. Celery Beat использует таблицы в базе данных для хранения расписания.
    python manage.py migrate
    
  3. Настраиваем конфигурацию Celery:
    В файл celery.py вашего проекта добавьте следующий код:
    
    from celery import Celery
    
    app = Celery('your_project_name')
    
    # Настройки из settings.py
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Автоматическая регистрация задач
    app.autodiscover_tasks()
    
    # Включение Beat
    app.conf.beat_scheduler = 'django_celery_beat.schedulers.DatabaseScheduler'
    

    Эта настройка активирует использование базы данных для хранения расписания.

  4. Запуск Celery Beat:
    Чтобы запустить Celery Beat, используйте команду:
    celery -A your_project_name beat --loglevel=info
    

    Beat начнёт работать в качестве планировщика, проверяя расписание в базе данных.


Настройка периодических задач

Теперь, когда Celery Beat настроен, давайте создадим несколько задач, которые будут выполняться по расписанию.

Пример задачи в файле tasks.py:


from celery import shared_task

@shared_task
def my_periodic_task():
    print("Эта задача выполняется по расписанию!")

Добавление задачи в расписание через Django Admin

  1. Запустите сервер Django и войдите в административную панель.
  2. В разделе "Periodic Tasks" нажмите кнопку "Add".
  3. Заполните следующие поля:
    • Name: имя задачи (например, "Приветствие").
    • Task (выбор): выберите задачу my_periodic_task из выпадающего списка.
    • Interval: выберите интервал выполнения задачи (например, "Каждый 1 минут").
  4. Нажмите "Сохранить".

Готово! Вы только что создали задачу, которая будет выполняться регулярно.


Примеры настройки расписания

Celery Beat поддерживает два типа интервалов для настройки задач: Interval Schedule и Crontab Schedule.

Interval Schedule используется, когда вы хотите выполнять задачу через определённые промежутки времени.

Пример: задача выполняется каждые 10 минут.

  1. В Django Admin зайдите в "Interval Schedules".
  2. Добавьте новый интервал:
    • Every: 10
    • Period: минут
  3. Назначьте этот интервал для Periodic Task.

Crontab Schedule — это более гибкий способ, когда вам нужно указать точное время или дни. Например, "каждый понедельник в 9:00".

Пример настройки:

  1. В Django Admin зайдите в "Crontab Schedules".
  2. Создайте новый Crontab:
    • Minute: 0
    • Hour: 9
    • Day of week: 1 (понедельник)
    • Month of year: *
  3. Назначьте этот Crontab для Periodic Task.
Подсказка:

Если вы забыли синтаксис Crontab, воспользуйтесь crontab.guru — удобным инструментом для тестирования расписания.


Примеры настройки в FastAPI

Если вы работаете с FastAPI, то Celery Beat также можно использовать. Основной принцип остаётся тот же: задачи Celery регистрируются и подключаются к планировщику с использованием Crontab или Interval.

Пример задачи:


from celery import Celery

celery_app = Celery('my_project', broker='redis://localhost')

@celery_app.task
def welcome_task():
    print("Привет от задачи Celery в FastAPI!")

Задачи настраиваются аналогично Django — через базу данных. Разница лишь в том, как вы интегрируете интерфейс (например, через REST).


Мониторинг выполнения задач

Для мониторинга выполнения задач после их планирования можно использовать Celery Flower.

Если вы пропустили это в предыдущих лекциях, вот краткое напоминание:

Установите Flower через pip:

pip install flower

Запустите Flower для вашего проекта:

celery -A your_project_name flower --port=5555

Flower предоставляет веб-интерфейс, где можно отслеживать выполнение задач и их статус.


Практические примеры планирования задач

1. Удаление старых данных из базы


@shared_task
def delete_old_entries():
    from my_app.models import MyModel
    MyModel.objects.filter(created_at__lt=datetime.now()-timedelta(days=30)).delete()

Настройте Crontab для еженедельного выполнения этой задачи.

2. Отправка ежемесячных отчётов


@shared_task
def send_monthly_report():
    print("Ежемесячный отчёт отправлен!")

Настройте Crontab для выполнения в первый день каждого месяца.


Minute: 0
Hour: 9
Day of month: 1

3. Обновление данных из API


@shared_task
def update_data_from_api():
    # Логика обновления из API
    print("Данные успешно обновлены!")

Настройте Interval Schedule для выполнения каждые 12 часов.


Дополнительные ресурсы

1
Задача
Модуль 4: FastAPI, 13 уровень, 7 лекция
Недоступна
Настройка Crontab Schedule для периодической задачи
Настройка Crontab Schedule для периодической задачи
1
Задача
Модуль 4: FastAPI, 13 уровень, 7 лекция
Недоступна
Планирование нескольких задач с помощью Celery Beat
Планирование нескольких задач с помощью Celery Beat
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ