JavaRush /Курсы /Модуль 4: FastAPI /Пример обработки недоставленных сообщений и их повторной ...

Пример обработки недоставленных сообщений и их повторной обработки

Модуль 4: FastAPI
15 уровень , 4 лекция
Открыта

Добро пожаловать на одну из самых "жизненных" лекций нашего курса! Сегодня мы поговорим о том, как справляться с ситуацией, когда сообщения "теряются" в асинхронных системах. Мы рассмотрим использование Dead-letter queues (DLQ) для их обработки и реализацию повторной доставки сообщений. Готовьтесь разбираться с реальными ошибками и восстанавливать порядок в нашем приложении. А теперь — в бой!

Давайте начнем с лирического вступления: что вообще такое недоставленные сообщения? Представьте, что у вас есть очередь RabbitMQ, которая принимает задачи от продюсера и обрабатывается консьюмером. Иногда задачи не выполняются, например:

  • Сервер консьюмера не отвечает.
  • Ошибка в логике обработки задачи.
  • Отказ в сети между RabbitMQ и консьюмером.

Эти случаи могут привести к настоящему хаосу, если не учитывать такие задачи. Именно поэтому используются Dead-letter queues — так мы не теряем сообщения и можем их обработать позже.


Настройка DLQ (Dead-letter Queue) в RabbitMQ

Dead-letter queue — это по сути "корзина для мусора", куда RabbitMQ складывает сообщения, если они не были обработаны. Создадим такую очередь для нашего приложения.

1. Установка RabbitMQ и базовая конфигурация

Если RabbitMQ еще не запущен, установите и настройте его. Ваш любимый Docker легко справится с этим:


docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Теперь интерфейс RabbitMQ доступен на http://localhost:15672 (логин: guest, пароль: guest).

2. Создание новой очереди с DLQ

Определим две очереди: основную (my_queue) и dead-letter queue (dl_queue). Это можно сделать вручную через интерфейс RabbitMQ или программно.

Настройка очередей через Python


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Настройка DLQ (Dead-letter queue)
channel.queue_declare(queue='dl_queue')

# Основная очередь с указанием DLQ
channel.queue_declare(
    queue='my_queue',
    arguments={
        'x-dead-letter-exchange': '',  # Обмен для DLQ
        'x-dead-letter-routing-key': 'dl_queue'  # Маршрутное ключевое слово для DLQ
    }
)

print("Очереди созданы!")
connection.close()

3. А что дальше?

Теперь, если сообщение не будет обработано в my_queue (например, консьюмер не подтвердит его обработку), оно автоматически попадет в dl_queue.


Пример использования DLQ в Celery

Переходим к Celery. Мы настроим задачу, которая иногда намеренно "падает" (ведь ошибки неизбежны, как зависимость программиста от кофе).

  1. Базовая настройка Celery

    Если вы еще не настроили Celery, вот пример минимальной конфигурации:

    
    from celery import Celery
    
    app = Celery('tasks', broker='pyamqp://guest@localhost//')  # RabbitMQ как брокер
    
  2. Задача, которая намеренно падает

    Добавим задачу, которая с вероятностью 50% будет выбрасывать исключение:

    
    import random
    
    @app.task(bind=True)
    def unreliable_task(self, x, y):
        if random.random() < 0.5:  # Вероятность ошибки
            raise RuntimeError("Случайная ошибка!")
        return x + y
    
  3. Настройка DLQ в Celery

    Celery автоматически использует DLQ RabbitMQ, если сообщение не обработано. Чтобы убедиться в этом, добавим задачу в очередь:

    
    result = unreliable_task.apply_async((2, 3), queue='my_queue')
    

    Запустите воркера Celery:

    
    celery -A tasks worker --loglevel=info
    

    Если задача выбросит исключение, сообщение можно найти в dl_queue.


Повторная обработка недоставленных сообщений

Теперь сделаем шаг вперед и реализуем логику повторной обработки сообщений из DLQ.

  1. "Чтение" сообщений из DLQ

    Мы подключимся к dl_queue и обработаем сообщения:

    
    import pika
    
    def process_dead_letters():
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()
    
        # Подключаемся к DLQ
        channel.queue_declare(queue='dl_queue')
    
        # Обработка сообщений
        def callback(ch, method, properties, body):
            print(f"Повторная обработка: {body.decode()}")
            ch.basic_ack(delivery_tag=method.delivery_tag)
    
        channel.basic_consume(queue='dl_queue', on_message_callback=callback)
    
        print("Ожидаем сообщений из DLQ...")
        channel.start_consuming()
    
    if __name__ == "__main__":
        process_dead_letters()
    
  2. Повторная отправка задач в основную очередь

    Теперь, после обработки сообщения из DLQ, мы можем возвращать его в основную очередь:

    
    def callback(ch, method, properties, body):
        print(f"Повторная обработка: {body.decode()}")
    
        # После обработки - вернуть в основную очередь
        ch.basic_publish(exchange='', routing_key='my_queue', body=body)
    
        # Подтверждаем обработку текущего сообщения
        ch.basic_ack(delivery_tag=method.delivery_tag)
    

    Этот код читает сообщения из dl_queue, повторно отправляет их в my_queue и подтверждает обработку.


Практическая реализация системы DLQ

Теперь представим, как это может использоваться в реальной жизни. Например, в e-commerce проекте:

  1. Задача: отправка email с подтверждением заказа.
  2. Если задача "падает" при отправке (например, по сетевой ошибке), она попадет в DLQ.
  3. Через некоторое время система автоматически извлекает сообщение из DLQ и пытается снова отправить email.

Заметки и типичные ошибки

  • Не забывайте подтверждать обработку сообщений. Если не вызывать basic_ack, сообщения застрянут в очереди.
  • Настраивайте время жизни сообщений в DLQ. Используйте аргумент x-message-ttl, чтобы сообщения не хранились вечно.
  • Избегайте "зацикливания". Если сообщение не удается обработать и после повторной отправки, добавьте логику, которая поместит его в архивную очередь или передаст администратору.

Заключение (шутка, его нет!)

Мы настроили Dead-letter queue, научились обрабатывать недоставленные сообщения и даже реализовали повторную обработку! Этот подход не только повышает надежность системы, но и доказывает, что даже программы могут исправить свои ошибки. Впереди еще больше увлекательной практики, так что продолжайте разрабатывать!

1
Задача
Модуль 4: FastAPI, 15 уровень, 4 лекция
Недоступна
Повторная обработка сообщений из Dead-letter Queue
Повторная обработка сообщений из Dead-letter Queue
1
Задача
Модуль 4: FastAPI, 15 уровень, 4 лекция
Недоступна
Автоматизация с использованием Celery
Автоматизация с использованием Celery
3
Опрос
Как обрабатывать ошибки в RabbitMQ и Celery, 15 уровень, 4 лекция
Недоступен
Как обрабатывать ошибки в RabbitMQ и Celery
Как обрабатывать ошибки в RabbitMQ и Celery
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ