JavaRush /Курсы /Модуль 4: FastAPI /Как обрабатывать ошибки в RabbitMQ и Celery

Как обрабатывать ошибки в RabbitMQ и Celery

Модуль 4: FastAPI
15 уровень , 1 лекция
Открыта

Начнем с RabbitMQ, который, как добрый почтовый голубь, доставляет сообщения вашему потребителю (consumer). Но что делать, если голубь устал и выронил письмо?

Подтверждения сообщений (Acknowledgments)

RabbitMQ позволяет установить механизмы подтверждений, благодаря которым вы не потеряете сообщения, даже если потребитель решит устроить себе отпуск:

  • Подтверждение сообщения вручную: потребитель явно сообщает RabbitMQ, что обработал сообщение успешно. Это снижает вероятность потери данных.
  • Отказ сообщения (Reject): если задача не выполнена, RabbitMQ может либо повторно отправить сообщение другому потребителю, либо переместить его в специальную очередь.

Пример настройки подтверждений:


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Объявляем очередь
channel.queue_declare(queue='example_queue', durable=True)

# Функция обработки сообщений
def callback(ch, method, properties, body):
    print(f"Получено сообщение: {body}")
    try:
        # Здесь обрабатываем сообщение
        do_processing(body) 
        # Подтверждаем, что обработка завершена успешно
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Ошибка: {e}")
        # Отказываться от сообщения
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

channel.basic_consume(
    queue='example_queue', 
    on_message_callback=callback
)

print('Ожидание сообщений...')
channel.start_consuming()

Здесь, если обработка сообщения завершилась неудачно, мы явно сообщаем RabbitMQ, что сообщение не обработано, чтобы оно либо ушло в Dead-letter queue, либо просто отбросилось.

Dead-letter Exchange (DLX)

Для сообщений, которые невозможно доставить или обработать, RabbitMQ поддерживает Dead-letter Exchange. Это, по сути, "кладбище" сообщений, где они ждут повторного использования или анализа.

Настройка DLX:

  1. Создаем exchange для "мертвых" сообщений.
  2. Привязываем его к новой очереди.
  3. Настраиваем исходную очередь на использование DLX.

Пример:


# Создаем "кладбище" для сообщений
rabbitmqadmin declare exchange name=dlx_exchange type=direct
rabbitmqadmin declare queue name=dlx_queue durable=true
rabbitmqadmin declare binding source=dlx_exchange destination=dlx_queue

# Настраиваем исходную очередь на использование DLX
rabbitmqadmin declare queue name=example_queue durable=true arguments='{"x-dead-letter-exchange":"dlx_exchange"}'

Теперь, если сообщение не будет обработано, оно автоматически попадет в dlx_queue.


Обработка ошибок в Celery

Теперь перейдем к Celery. Если RabbitMQ — это почтальон, то Celery — это мастер на все руки, который выполняет задачи (и иногда ошибается). Здесь мы поговорим о перезапусках задач (retry) и других способах обработки ошибок.

Модели RetryPolicy для повторного выполнения задач

Celery поддерживает встроенные механизмы перезапуска задач. Например, если задача не выполнилась из-за временной ошибки (например, сеть была недоступна), вы можете попробовать выполнить её снова через некоторое время.

Пример простого retry:


from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task(bind=True, default_retry_delay=5 * 60, max_retries=3)  # Повтор через 5 минут, максимум 3 раза
def my_task(self, arg):
    try:
        # Выполняем задачу
        do_something(arg)
    except Exception as exc:
        # Если ошибка, пробуем ещё раз
        raise self.retry(exc=exc)

Особенности конфигурации retry:

  • default_retry_delay: время между попытками.
  • max_retries: максимальное количество попыток.
  • Используйте self.retry вместо стандартного try-except, чтобы Celery автоматически обработал повтор.

Таймауты задач

Вы также можете настроить максимальное время выполнения задачи. Например, если задача "застряла", её выполнение должно автоматически прерваться.

Пример:


@app.task(time_limit=30)  # Задача будет принудительно завершена через 30 секунд
def long_running_task(arg):
    # Ваша длительная задача
    do_some_long_work(arg)

Как Celery и RabbitMQ работают вместе для обработки ошибок

Чтобы связать Celery и RabbitMQ в одну эффективную систему, вам нужно настроить:

  1. Подтверждения от Celery: Celery автоматически отправляет подтверждения в RabbitMQ о выполнении задач.
  2. Использование DLX: свяжите RabbitMQ и Celery так, чтобы неудачные задачи отправлялись в Dead-letter queue.

Пример:

В конфигурации Celery укажите параметр:


app.conf.task_acks_late = True  # Подтверждение только после успешного выполнения задачи
app.conf.task_reject_on_worker_lost = True  # Передача задачи обратно в очередь при сбое воркера

Теперь, если воркер "умер" во время выполнения задачи, RabbitMQ вернёт её обратно в очередь или отправит в DLX.


Практический пример: восстановление задач после ошибок

Давайте соберем воедино:

  1. Настраиваем RabbitMQ для работы с DLX.
  2. Создаём задачу Celery с retry и логированием ошибок.
  3. Добавляем таймауты на задачи.

# Конфигурируем Celery
app.conf.update(
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    worker_disable_rate_limits=True
)

# Настраиваем задачу с обработкой ошибок
@app.task(bind=True, default_retry_delay=10, max_retries=5, time_limit=30)
def process_data(self, data):
    try:
        # Логика задачи
        result = do_important_work(data)
        return result
    except ConnectionError as exc:
        self.retry(exc=exc)  # Пробуем снова при сетевой ошибке
    except Exception as exc:
        # Логгируем другие ошибки
        logger.error(f"Ошибка обработки: {exc}")
        raise

Если задача не смогла выполниться 5 раз, она попадёт в DLX, настроенный ранее в RabbitMQ.


Важные моменты

  • Настройка подтверждений и DLX в RabbitMQ помогает избежать потери сообщений.
  • Использование retry и таймаутов в Celery делает систему более устойчивой.
  • Правильное логирование всех ошибок — ваш лучший друг. Используйте внешние инструменты, такие как Sentry, для мониторинга.

Ну что, теперь ошибки — не враги, а просто часть вашей работы. Как говорил великий Боб: "Эта ошибка не моя, я её просто наследую!". Проверяйте свой код, настраивайте обработку ошибок, и пусть ваш асинхронный код работает как часы (но не падает!).


Рекомендуемая документация:

1
Задача
Модуль 4: FastAPI, 15 уровень, 1 лекция
Недоступна
Реализация Dead-letter Exchange (DLX) в RabbitMQ
Реализация Dead-letter Exchange (DLX) в RabbitMQ
1
Задача
Модуль 4: FastAPI, 15 уровень, 1 лекция
Недоступна
Задача с перезапуском в Celery
Задача с перезапуском в Celery
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ