Тепер настав час розібратися з одним із найзапитуваніших питань: «А як мені дізнатися, чи виконалася моя задача, і якщо так — з яким результатом?» Бо в реальному житті залишати задачі без відстеження — це як відправити важливий лист без трекінгу, сподіваючись, що він «якось дійде».
Мета відстеження задач проста: ти хочеш переконатися, що задача:
- Успішно виконана.
- Виконана в потрібні терміни.
- Не впала з помилкою.
- Повернула результат, якщо це потрібно.
А уяви, що в проєкті ти маєш тисячі задач. Наприклад, масова розсилка листів або обробка завантажених файлів. Як дізнатися, скільки задач виконано і скільки ще в черзі? Для цього Celery дає простий і ефективний механізм моніторингу.
Основні механізми відстеження статусів задач
1. Task Result Backend
Celery використовує механізм «backends» для зберігання даних про завдання. Backend відповідає за збереження:
- статусу задачі (в очікуванні, виконується, виконана, впала з помилкою);
- результату задачі (якщо це потрібно).
Приклади популярних backend'ів:
- Redis — швидкий і простий у налаштуванні.
- RabbitMQ — корисний, якщо ти вже використовуєш його як брокер повідомлень.
- Database (SQL) — зберігає статуси в базі даних.
- MongoDB — якщо ти вже використовуєш його в системі.
Для простоти і швидкості ми будемо використовувати Redis як наш backend.
Налаштування backend для Celery
Спочатку переконайся, що Redis налаштований і запущений. Якщо ти не встановив Redis, зроби це через Docker:
docker run -d -p 6379:6379 redis
Тепер оновимо конфігурацію Celery:
config.py
from celery import Celery
app = Celery(
"my_project",
broker="redis://localhost:6379/0", # Брокер
backend="redis://localhost:6379/0", # Backend для зберігання результатів
)
app.conf.task_track_started = True # Увімкнути відстеження виконання
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.accept_content = ["json"]
Увімкнувши backend, ми тепер можемо отримати результат будь-якої задачі за її унікальним task ID.
2. Отримання результату задачі
Celery дає зручний спосіб отримувати результати через AsyncResult. Кожного разу, коли задача відправляється на виконання, повертається об'єкт з її унікальним ID. Саме цей ID можна використовувати для перевірки статусу і результату.
Приклад задачі:
tasks.py
from time import sleep
from config import app
@app.task
def add(x, y):
sleep(5) # Імітація довгої задачі
return x + y
Відправимо задачу на виконання і відстежимо її:
main.py
from tasks import add
from celery.result import AsyncResult
# Відправляємо задачу на виконання
task = add.apply_async((10, 20))
# Отримуємо унікальний ідентифікатор задачі
print(f"Task ID: {task.id}")
# Тепер можемо перевіряти статус або результат задачі
task_result = AsyncResult(task.id)
while not task_result.ready():
print(f"Task status: {task_result.status}") # Очікуємо статус: PENDING або STARTED
sleep(1)
# Коли задача завершена
if task_result.successful():
print(f"Task finished successfully. Result: {task_result.result}")
else:
print(f"Task failed with status: {task_result.status}")
Такий підхід особливо корисний в сценаріях, коли потрібно дочекатися закінчення задачі або обробити помилки.
Типові статуси задач у Celery
| Статус | Опис |
|---|---|
PENDING |
Задача очікує виконання (не призначена worker-у) |
STARTED |
Задача почала виконуватися (якщо увімкнено task_track_started) |
RETRY |
Задача була повторно відправлена (наприклад, при збої підключення) |
FAILURE |
Задача завершилася з помилкою |
SUCCESS |
Задача успішно завершена і результат доступний |
3. Використання Celery Flower для моніторингу
Якщо потрібно керувати великою кількістю задач, познайомся з Celery Flower — це веб-інтерфейс для моніторингу задач і воркерів.
Встановити Flower можна через pip:
pip install flower
Запусти сервер Flower:
celery -A config flower
Тепер відкрий http://localhost:5555 в браузері, і ти побачиш:
- Список усіх задач.
- Їхній поточний статус.
- Роботу воркерів.
- Можливість завершити задачу або повторно її виконати.
Управління результатами задач
Інколи ти не хочеш зберігати результат задачі після її виконання. Наприклад, якщо обробляються тисячі задач, то розмір сховища може швидко зрости. Celery дозволяє налаштувати автоматичний час життя результатів через task_result_expires.
Приклад налаштування:
config.py
app.conf.result_expires = 3600 # Видалити результати через 1 годину
Це корисно для економії ресурсів, якщо не потрібно довго зберігати інформацію про старі задачі.
Логування задач і відстеження помилок
Для ефективного моніторингу корисно вести журнал (лог) виконання задач. Celery має вбудовану підтримку логування через Python.
Приклад налаштування логування:
config.py
import logging
app.conf.worker_log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
app.conf.worker_task_log_format = "%(asctime)s - %(task_name)s - %(task_id)s - %(levelname)s - %(message)s"
# Приклад логування задач
logger = logging.getLogger("celery")
@app.task
def debug_task():
logger.info("Task started")
return "Debug completed"
Усі задачі будуть логуватися автоматично з вказівкою їхніх статусів, помилок і виконаних дій.
Практичний приклад: відправка листів з логуванням результату
Розглянемо реальний сценарій. У нас є сайт, який надсилає email після реєстрації користувача.
tasks.py
from config import app
@app.task(bind=True)
def send_email(self, email, content):
try:
# Імітація відправки листа
print(f"Sending email to {email}")
return f"Email sent to {email}"
except Exception as e:
self.retry(exc=e, countdown=5, max_retries=3) # Повторити задачу у разі помилки
main.py
from tasks import send_email
from celery.result import AsyncResult
# Відправка
task = send_email.delay("user@example.com", "Welcome to our service!")
# Відстеження результату
task_result = AsyncResult(task.id)
if task_result.successful():
print(f"Task completed: {task_result.result}")
elif task_result.failed():
print("Task failed.")
Цей приклад показує, як безпечно обробляти задачі з повторними спробами і мінімізувати помилки.
Особливості й підводні камені
Здається, що все просто, але є кілька моментів, які варто врахувати. Іноді здається, що задача зникла — це може бути пов'язано з неправильною конфігурацією брокера або backend'а. Переконайся, що твій брокер працює стабільно, і перевір логи Celery, якщо задачі переходять у статус PENDING.
Буває, що задачі виконуються занадто довго або воркери зависають. Рішення — збільшити таймаути або розбити задачу на дрібніші підзадачі.
Тепер ти озброєний знаннями для моніторингу та впевненого управління задачами.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ