JavaRush /Курси /Модуль 4: FastAPI /Приклад обробки недоставлених повідомлень і їх повторної ...

Приклад обробки недоставлених повідомлень і їх повторної обробки

Модуль 4: FastAPI
Рівень 15 , Лекція 4
Відкрита

Ласкаво просимо на одну з най "життєвих" лекцій нашого курсу! Сьогодні поговоримо про те, як справлятись із ситуацією, коли повідомлення "губляться" в асинхронних системах. Розглянемо використання Dead-letter queues (DLQ) для їх обробки і реалізацію повторної доставки повідомлень. Готуйтесь розбиратись з реальними помилками і відновлювати порядок у нашому додатку. А тепер — в бій!

Почнемо з ліричного вступу: що взагалі таке недоставлені повідомлення? Уявіть, що у вас є черга RabbitMQ, яка приймає задачі від продюсера і обробляється консьюмером. Іноді задачі не виконуються, наприклад:

  • Сервер консьюмера не відповідає.
  • Помилка в логіці обробки задачі.
  • Відмова в мережі між RabbitMQ і консьюмером.

Ці випадки можуть привести до справжнього хаосу, якщо не брати до уваги такі задачі. Саме тому використовуються Dead-letter queues — так ми не втрачаємо повідомлення і можемо їх обробити пізніше.


Налаштування DLQ (Dead-letter Queue) в RabbitMQ

Dead-letter queue — це по суті "кошик для сміття", куди RabbitMQ складає повідомлення, якщо вони не були оброблені. Створимо таку чергу для нашого додатку.

1. Встановлення RabbitMQ і базова конфігурація

Якщо RabbitMQ ще не запущений, встановіть і налаштуйте його. Ваш улюблений Docker легко з цим впорається:


docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Тепер інтерфейс RabbitMQ доступний за адресою http://localhost:15672 (логін: guest, пароль: guest).

2. Створення нової черги з DLQ

Визначимо дві черги: основну (my_queue) і dead-letter queue (dl_queue). Це можна зробити вручну через інтерфейс RabbitMQ або програмно.

Налаштування черг через Python


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Налаштування DLQ (Dead-letter queue)
channel.queue_declare(queue='dl_queue')

# Основна черга з вказівкою DLQ
channel.queue_declare(
    queue='my_queue',
    arguments={
        'x-dead-letter-exchange': '',  # Exchange для DLQ
        'x-dead-letter-routing-key': 'dl_queue'  # Routing key для DLQ
    }
)

print("Черги створено!")
connection.close()

3. А що далі?

Тепер, якщо повідомлення не буде оброблене в my_queue (наприклад, консьюмер не підтвердить його обробку), воно автоматично потрапить в dl_queue.


Приклад використання DLQ в Celery

Переходимо до Celery. Налаштуємо задачу, яка іноді навмисно "падає" (бо помилки неминучі, як залежність програміста від кави).

  1. Базове налаштування Celery

    Якщо ви ще не налаштували Celery, ось приклад мінімальної конфігурації:

    
    from celery import Celery
    
    app = Celery('tasks', broker='pyamqp://guest@localhost//')  # RabbitMQ як broker
    
  2. Задача, яка навмисно падає

    Додамо задачу, яка з ймовірністю 50% буде кидати виняток:

    
    import random
    
    @app.task(bind=True)
    def unreliable_task(self, x, y):
        if random.random() < 0.5:  # Ймовірність помилки
            raise RuntimeError("Випадкова помилка!")
        return x + y
    
  3. Налаштування DLQ в Celery

    Celery автоматично використовує DLQ RabbitMQ, якщо повідомлення не оброблене. Щоб переконатись у цьому, додамо задачу в чергу:

    
    result = unreliable_task.apply_async((2, 3), queue='my_queue')
    

    Запустіть worker Celery:

    
    celery -A tasks worker --loglevel=info
    

    Якщо задача кине виняток, повідомлення можна знайти в dl_queue.


Повторна обробка недоставлених повідомлень

Тепер зробимо крок уперед і реалізуємо логіку повторної обробки повідомлень з DLQ.

  1. "Читання" повідомлень з DLQ

    Ми підключимось до dl_queue і обробимо повідомлення:

    
    import pika
    
    def process_dead_letters():
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()
    
        # Підключаємось до DLQ
        channel.queue_declare(queue='dl_queue')
    
        # Обробка повідомлень
        def callback(ch, method, properties, body):
            print(f"Повторна обробка: {body.decode()}")
            ch.basic_ack(delivery_tag=method.delivery_tag)
    
        channel.basic_consume(queue='dl_queue', on_message_callback=callback)
    
        print("Чекаємо повідомлень з DLQ...")
        channel.start_consuming()
    
    if __name__ == "__main__":
        process_dead_letters()
    
  2. Повторна відправка задач в основну чергу

    Тепер, після обробки повідомлення з DLQ, ми можемо повертати його в основну чергу:

    
    def callback(ch, method, properties, body):
        print(f"Повторна обробка: {body.decode()}")
    
        # Після обробки - повернути в основну чергу
        ch.basic_publish(exchange='', routing_key='my_queue', body=body)
    
        # Підтверджуємо обробку поточного повідомлення
        ch.basic_ack(delivery_tag=method.delivery_tag)
    

    Цей код читає повідомлення з dl_queue, повторно надсилає їх в my_queue і підтверджує обробку.


Практична реалізація системи DLQ

Тепер уявімо, як це може використовуватись у реальному житті. Наприклад, в e-commerce проєкті:

  1. Задача: відправка email з підтвердженням замовлення.
  2. Якщо задача "впаде" при відправці (наприклад, через мережеву помилку), вона потрапить в DLQ.
  3. Через певний час система автоматично витягує повідомлення з DLQ і намагається ще раз відправити email.

Нотатки і типові помилки

  • Не забувайте підтверджувати обробку повідомлень. Якщо не викликати basic_ack, повідомлення застрягнуть у черзі.
  • Налаштовуйте час життя повідомлень в DLQ. Використовуйте аргумент x-message-ttl, щоб повідомлення не зберігались вічно.
  • Уникайте "зациклювання". Якщо повідомлення не вдається обробити і після повторної відправки, додайте логіку, яка помістить його в архівну чергу або передасть адміністратору.

Висновок (жарт, його нема!)

Ми налаштували Dead-letter queue, навчилися обробляти недоставлені повідомлення і навіть реалізували повторну обробку! Цей підхід не тільки підвищує надійність системи, але й доводить, що навіть програми можуть виправити свої помилки. Попереду ще більше захопливої практики, тож продовжуйте розробляти!

3
Опитування
Як обробляти помилки в RabbitMQ та Celery, рівень 15, лекція 4
Недоступний
Як обробляти помилки в RabbitMQ та Celery
Як обробляти помилки в RabbitMQ та Celery
Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ