JavaRush /Курси /Модуль 4: FastAPI /Приклад планування й виконання регулярних задач з Celery

Приклад планування й виконання регулярних задач з Celery

Модуль 4: FastAPI
Рівень 13 , Лекція 9
Відкрита

Вступ

Сьогодні ми реалізуємо приклад повного циклу планування, виконання й оптимізації регулярних задач з використанням Celery і Celery Beat.

Для початку визначимося, яку функціональність хочемо додати в наш застосунок. Уявімо, що в нас є застосунок на FastAPI, який надсилає щоденні звіти користувачам. Відправка таких звітів має бути повністю автоматизована — тут нам допоможе Celery Beat.

Визначення задачі в Celery

Припустимо, ми хочемо щодня надсилати електронною поштою звіти нашим користувачам. Для цього створимо Celery-задачу:


from celery import Celery
from datetime import datetime

# Створюємо об'єкт Celery
celery_app = Celery('tasks', broker='redis://localhost:6379/0')

@celery_app.task
def send_daily_report():
    # Тут ми симулюємо відправлення звіту
    now = datetime.now()
    print(f"[{now}] Відправлення щоденного звіту почалося...")
    # Додайте тут вашу логіку відправлення даних
    print(f"[{now}] Звіт успішно відправлено!")

Примітка: брокером у нас виступає Redis, про який ми говорили раніше. Якщо ви використовуєте RabbitMQ, просто замініть broker='redis://localhost:6379/0' на відповідний рядок підключення.

Як тільки задача буде зареєстрована і налаштована, ми можемо використовувати Celery Beat для її періодичного виклику.

Підключення Celery Beat

Celery Beat використовує спеціальну таблицю в базі даних для зберігання розкладу задач. Перед тим як підключати Beat, переконаємося, що відповідний пакет встановлений:


pip install django-celery-beat

Далі налаштуємо застосунок. У Django це робиться через додавання django_celery_beat в INSTALLED_APPS і виконання міграцій:


python manage.py migrate

Важливий момент: у FastAPI інтеграція Celery Beat робиться через міграцію і налаштування бази даних вручну, бо немає вбудованого механізму для реєстрації схем, як в Django. Деталі можна знайти в документації django-celery-beat.

Реєстрація задачі в Celery Beat

Тепер зареєструємо нашу задачу в Celery Beat. У Django це можна зробити через панель адміністратора:

  1. Перейдіть у /admin/ і переконайтеся, що там видно розділ Periodic Tasks.
  2. Створіть нову задачу (Periodic Task):
    • Назва: send_daily_report
    • Регулярне виконання: раз на день.
    • Зв'язок із існуючою задачею Celery: вкажіть шлях tasks.send_daily_report.

Готово — задача зареєстрована! Celery Beat подбатиме про те, щоб вона виконувалася в зазначений час.


Обробка та виконання регулярних задач

Тепер, коли наша задача зареєстрована, час налаштувати воркери Celery для її виконання. Запустимо Celery-воркер і Beat:


celery -A tasks worker --loglevel=info
celery -A tasks beat --loglevel=info

Обидва ці процеси потрібно запускати паралельно. У реальних системах їх можна розгорнути як незалежні служби (наприклад, через systemd або Docker Compose).

Якщо все зроблено правильно, у консолі воркера ви побачите щось на кшталт:


[2025-10-01 00:00:00,123: INFO/MainProcess] Відправлення щоденного звіту почалося...
[2025-10-01 00:00:03,456: INFO/MainProcess] Звіт успішно відправлено!

Оптимізація роботи задач

Навіть з налаштованою періодичною задачею ми можемо зіткнутися з низкою проблем. Наприклад, задача може займати надто багато часу, або база даних для зберігання розкладів може стати вузьким місцем. Розглянемо кілька підходів для оптимізації.

Використання черг задач з пріоритетами

Нехай задача відправки звітів буде менш пріоритетною, ніж інші критичні задачі. Ми можемо налаштувати окрему чергу для низькоприоритетних задач. Для цього додамо в celery_app визначення черг:


from kombu import Exchange, Queue

celery_app.conf.task_queues = (
    Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
)

celery_app.conf.task_default_queue = 'default'

Тепер вкажемо нашу задачу у "низькоприоритетну" чергу:


@celery_app.task(queue='low_priority')
def send_daily_report():
    # Логіка задачі залишається та сама
    pass

Моніторинг задач

Для моніторингу задач використовується Celery Flower. Переконайтеся, що він встановлений:


pip install flower

Запустіть сервер Flower:


celery -A tasks flower --port=5555

Відкрийте в браузері http://localhost:5555, щоб побачити панелі моніторингу задач. Тут ви зможете побачити всі активні задачі, їхній статус, а також історію виконань.

Балансування навантаження

Якщо одна машина не справляється з навантаженням, ви можете розподілити виконання задач між кількома воркерами. Просто запустіть кілька воркерів на різних машинах (або контейнерах):


celery -A tasks worker --loglevel=info --hostname=worker1@%h
celery -A tasks worker --loglevel=info --hostname=worker2@%h

Практичний приклад налаштування задачі

Давайте зберемо все, що дізналися, в один приклад.

  1. Підключимо Celery і Celery Beat до проєкту на FastAPI.
  2. Створимо задачу відправки звітів.
  3. Налаштуємо її виконання раз на день.
  4. Оптимізуємо систему за допомогою низькоприоритетної черги і моніторингу через Flower.

Після цього наш застосунок буде повністю готовий до виконання регулярних задач!

3
Опитування
Використання Celery Beat, рівень 13, лекція 9
Недоступний
Використання Celery Beat
Використання Celery Beat для планування регулярних задач
Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ