JavaRush /Курсы /Модуль 4: FastAPI /Как отслеживать выполнение задач и их статус

Как отслеживать выполнение задач и их статус

Модуль 4: FastAPI
13 уровень , 5 лекция
Открыта

Теперь настало время разобраться с одним из самых востребованных вопросов: «А как мне узнать, выполнилась ли моя задача, и если да, то с каким результатом?» Ведь в реальном мире оставлять задачи без отслеживания — это как отправить важное письмо без трекинга, надеясь, что оно «как-нибудь дойдёт».

Цель отслеживания задач проста: вы хотите убедиться, что задача:

  1. Успешно выполнена.
  2. Выполнена в нужные сроки.
  3. Не упала с ошибкой.
  4. Вернула результат, если он необходим.

А представьте, что у вас в проекте тысячи задач. Например, массовая рассылка писем или обработка загрузки файлов. Как вы узнаете, сколько задач выполнилось и сколько ещё в очереди? Для этого Celery предоставляет простой и эффективный механизм мониторинга.


Основные механизмы отслеживания статусов задач

1. Task Result Backend

Celery использует механизм «backends» для хранения данных о заданиях. Backend отвечает за сохранение:

  • статуса задачи (в ожидании, выполняется, выполнена, упала с ошибкой);
  • результата задачи (если это необходимо).

Примеры популярных backend'ов:

  • Redis — быстрый и лёгкий в настройке.
  • RabbitMQ — полезен, если вы уже используете его как брокер сообщений.
  • Database (SQL) — сохраняет статусы в базе данных.
  • MongoDB — если вы уже используете его как часть системы.

Для простоты и скорости мы будем использовать Redis в качестве нашего backend'а.

Настройка backend для Celery

Сначала убедитесь, что Redis настроен и запущен. Если вы не установили Redis, сделайте это с помощью Docker:

docker run -d -p 6379:6379 redis

Теперь обновим конфигурацию Celery:

config.py


from celery import Celery

app = Celery(
    "my_project",
    broker="redis://localhost:6379/0",  # Брокер
    backend="redis://localhost:6379/0",  # Backend для хранения результатов
)

app.conf.task_track_started = True  # Включить отслеживание выполнения
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.accept_content = ["json"]

Включив backend, мы теперь можем получить результат любой задачи с помощью уникального task ID.

2. Получение результата задачи

Celery предоставляет удобный способ получения результатов через AsyncResult. Каждый раз, когда задача отправляется на выполнение, возвращается объект, содержащий её уникальный ID. Именно этот ID можно использовать для проверки статуса и результата.

Пример задачи:

tasks.py


from time import sleep
from config import app

@app.task
def add(x, y):
    sleep(5)  # Имитация долгой задачи
    return x + y

Отправим задачу на выполнение и отслеживаем её:

main.py


from tasks import add
from celery.result import AsyncResult

# Отправляем задачу на выполнение
task = add.apply_async((10, 20))

# Получаем уникальный идентификатор задачи
print(f"Task ID: {task.id}")

# Теперь можем проверять статус или результат задачи
task_result = AsyncResult(task.id)

while not task_result.ready():
    print(f"Task status: {task_result.status}")  # Ожидаем статус: PENDING или STARTED
    sleep(1)

# Когда задача завершена
if task_result.successful():
    print(f"Task finished successfully. Result: {task_result.result}")
else:
    print(f"Task failed with status: {task_result.status}")

Этот подход особенно полезен в сценариях, когда вам нужно дождаться завершения задачи или обработать ошибки.


Типичные статусы задач в Celery

Статус Описание
PENDING Задача ожидает выполнения (не назначена рабочему)
STARTED Задача начала выполняться (если включено task_track_started)
RETRY Задача была переотправлена (например, при сбое подключения)
FAILURE Задача завершилась ошибкой
SUCCESS Задача успешно завершилась и результат доступен

3. Использование Celery Flower для мониторинга

Если вам нужно управлять несколькими задачами, познакомьтесь с Celery Flower — это веб-интерфейс для мониторинга задач и рабочих процессов.

Установить Flower можно через pip:

pip install flower

Запустите сервер Flower:

celery -A config flower

Теперь откройте http://localhost:5555 в браузере, и вы увидите:

  • Список всех задач.
  • Их текущий статус.
  • Работу воркеров.
  • Возможность завершить задачу или повторно её исполнить.

Управление результатами задач

В некоторых случаях вы не хотите хранить результат задачи после её выполнения. Например, если обрабатываются тысячи задач, размер хранилища может быстро увеличиться. Celery позволяет настроить автоматическое время жизни результатов через task_result_expires.

Пример настройки:

config.py


app.conf.result_expires = 3600  # Удалить результаты через 1 час

Это полезно для экономии ресурсов, если вам не нужно долго хранить информацию о старых задачах.


Логирование задач и отслеживание ошибок

Для эффективного мониторинга полезно вести журнал (лог) выполнения задач. Celery встроено поддерживает логирование через Python.

Пример настройки логирования:

config.py


import logging

app.conf.worker_log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
app.conf.worker_task_log_format = "%(asctime)s - %(task_name)s - %(task_id)s - %(levelname)s - %(message)s"

# Пример логирования задач
logger = logging.getLogger("celery")

@app.task
def debug_task():
    logger.info("Task started")
    return "Debug completed"

Все задачи будут логироваться автоматически с указанием их статусов, ошибок и выполненных действий.


Практический пример: отправка писем с логированием результата

Рассмотрим реальный сценарий. У нас есть сайт, который отправляет email после регистрации пользователя.

tasks.py


from config import app

@app.task(bind=True)
def send_email(self, email, content):
    try:
        # Имитация отправки письма
        print(f"Sending email to {email}")
        return f"Email sent to {email}"
    except Exception as e:
        self.retry(exc=e, countdown=5, max_retries=3)  # Повторить задачу в случае ошибки

main.py


from tasks import send_email
from celery.result import AsyncResult

# Отправка
task = send_email.delay("user@example.com", "Welcome to our service!")

# Отслеживание результата
task_result = AsyncResult(task.id)
if task_result.successful():
    print(f"Task completed: {task_result.result}")
elif task_result.failed():
    print("Task failed.")

Этот пример показывает, как безопасно обрабатывать задачи с повторами и минимизировать ошибки.


Особенности и подводные камни

Всё кажется лёгким, но есть пара моментов, которые стоит учитывать. Иногда кажется, что задача пропала — это может быть связано с неправильной конфигурацией брокера или backend'а. Убедитесь, что ваш брокер работает стабильно, и проверьте логи Celery, если задачи переходят в статус PENDING.

Бывает, что задачи выполняются слишком долго или воркеры зависают. Решение — увеличить таймауты или разделить задачи на мелкие подзадачи.

Теперь вы вооружены знаниями для мониторинга и уверенного управления задачами.

1
Задача
Модуль 4: FastAPI, 13 уровень, 5 лекция
Недоступна
Отслеживание статуса задачи
Отслеживание статуса задачи
1
Задача
Модуль 4: FastAPI, 13 уровень, 5 лекция
Недоступна
Логирование статусов задач
Логирование статусов задач
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ