Вступ
Сьогодні ми реалізуємо приклад повного циклу планування, виконання й оптимізації регулярних задач з використанням Celery і Celery Beat.
Для початку визначимося, яку функціональність хочемо додати в наш застосунок. Уявімо, що в нас є застосунок на FastAPI, який надсилає щоденні звіти користувачам. Відправка таких звітів має бути повністю автоматизована — тут нам допоможе Celery Beat.
Визначення задачі в Celery
Припустимо, ми хочемо щодня надсилати електронною поштою звіти нашим користувачам. Для цього створимо Celery-задачу:
from celery import Celery
from datetime import datetime
# Створюємо об'єкт Celery
celery_app = Celery('tasks', broker='redis://localhost:6379/0')
@celery_app.task
def send_daily_report():
# Тут ми симулюємо відправлення звіту
now = datetime.now()
print(f"[{now}] Відправлення щоденного звіту почалося...")
# Додайте тут вашу логіку відправлення даних
print(f"[{now}] Звіт успішно відправлено!")
Примітка: брокером у нас виступає Redis, про який ми говорили раніше. Якщо ви використовуєте RabbitMQ, просто замініть broker='redis://localhost:6379/0' на відповідний рядок підключення.
Як тільки задача буде зареєстрована і налаштована, ми можемо використовувати Celery Beat для її періодичного виклику.
Підключення Celery Beat
Celery Beat використовує спеціальну таблицю в базі даних для зберігання розкладу задач. Перед тим як підключати Beat, переконаємося, що відповідний пакет встановлений:
pip install django-celery-beat
Далі налаштуємо застосунок. У Django це робиться через додавання django_celery_beat в INSTALLED_APPS і виконання міграцій:
python manage.py migrate
Важливий момент: у FastAPI інтеграція Celery Beat робиться через міграцію і налаштування бази даних вручну, бо немає вбудованого механізму для реєстрації схем, як в Django. Деталі можна знайти в документації django-celery-beat.
Реєстрація задачі в Celery Beat
Тепер зареєструємо нашу задачу в Celery Beat. У Django це можна зробити через панель адміністратора:
- Перейдіть у
/admin/і переконайтеся, що там видно розділ Periodic Tasks. - Створіть нову задачу (Periodic Task):
- Назва:
send_daily_report - Регулярне виконання: раз на день.
- Зв'язок із існуючою задачею Celery: вкажіть шлях
tasks.send_daily_report.
- Назва:
Готово — задача зареєстрована! Celery Beat подбатиме про те, щоб вона виконувалася в зазначений час.
Обробка та виконання регулярних задач
Тепер, коли наша задача зареєстрована, час налаштувати воркери Celery для її виконання. Запустимо Celery-воркер і Beat:
celery -A tasks worker --loglevel=info
celery -A tasks beat --loglevel=info
Обидва ці процеси потрібно запускати паралельно. У реальних системах їх можна розгорнути як незалежні служби (наприклад, через systemd або Docker Compose).
Якщо все зроблено правильно, у консолі воркера ви побачите щось на кшталт:
[2025-10-01 00:00:00,123: INFO/MainProcess] Відправлення щоденного звіту почалося...
[2025-10-01 00:00:03,456: INFO/MainProcess] Звіт успішно відправлено!
Оптимізація роботи задач
Навіть з налаштованою періодичною задачею ми можемо зіткнутися з низкою проблем. Наприклад, задача може займати надто багато часу, або база даних для зберігання розкладів може стати вузьким місцем. Розглянемо кілька підходів для оптимізації.
Використання черг задач з пріоритетами
Нехай задача відправки звітів буде менш пріоритетною, ніж інші критичні задачі. Ми можемо налаштувати окрему чергу для низькоприоритетних задач. Для цього додамо в celery_app визначення черг:
from kombu import Exchange, Queue
celery_app.conf.task_queues = (
Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
)
celery_app.conf.task_default_queue = 'default'
Тепер вкажемо нашу задачу у "низькоприоритетну" чергу:
@celery_app.task(queue='low_priority')
def send_daily_report():
# Логіка задачі залишається та сама
pass
Моніторинг задач
Для моніторингу задач використовується Celery Flower. Переконайтеся, що він встановлений:
pip install flower
Запустіть сервер Flower:
celery -A tasks flower --port=5555
Відкрийте в браузері http://localhost:5555, щоб побачити панелі моніторингу задач. Тут ви зможете побачити всі активні задачі, їхній статус, а також історію виконань.
Балансування навантаження
Якщо одна машина не справляється з навантаженням, ви можете розподілити виконання задач між кількома воркерами. Просто запустіть кілька воркерів на різних машинах (або контейнерах):
celery -A tasks worker --loglevel=info --hostname=worker1@%h
celery -A tasks worker --loglevel=info --hostname=worker2@%h
Практичний приклад налаштування задачі
Давайте зберемо все, що дізналися, в один приклад.
- Підключимо Celery і Celery Beat до проєкту на FastAPI.
- Створимо задачу відправки звітів.
- Налаштуємо її виконання раз на день.
- Оптимізуємо систему за допомогою низькоприоритетної черги і моніторингу через Flower.
Після цього наш застосунок буде повністю готовий до виконання регулярних задач!
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ