Ласкаво просимо на одну з най "життєвих" лекцій нашого курсу! Сьогодні поговоримо про те, як справлятись із ситуацією, коли повідомлення "губляться" в асинхронних системах. Розглянемо використання Dead-letter queues (DLQ) для їх обробки і реалізацію повторної доставки повідомлень. Готуйтесь розбиратись з реальними помилками і відновлювати порядок у нашому додатку. А тепер — в бій!
Почнемо з ліричного вступу: що взагалі таке недоставлені повідомлення? Уявіть, що у вас є черга RabbitMQ, яка приймає задачі від продюсера і обробляється консьюмером. Іноді задачі не виконуються, наприклад:
- Сервер консьюмера не відповідає.
- Помилка в логіці обробки задачі.
- Відмова в мережі між RabbitMQ і консьюмером.
Ці випадки можуть привести до справжнього хаосу, якщо не брати до уваги такі задачі. Саме тому використовуються Dead-letter queues — так ми не втрачаємо повідомлення і можемо їх обробити пізніше.
Налаштування DLQ (Dead-letter Queue) в RabbitMQ
Dead-letter queue — це по суті "кошик для сміття", куди RabbitMQ складає повідомлення, якщо вони не були оброблені. Створимо таку чергу для нашого додатку.
1. Встановлення RabbitMQ і базова конфігурація
Якщо RabbitMQ ще не запущений, встановіть і налаштуйте його. Ваш улюблений Docker легко з цим впорається:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Тепер інтерфейс RabbitMQ доступний за адресою http://localhost:15672 (логін: guest, пароль: guest).
2. Створення нової черги з DLQ
Визначимо дві черги: основну (my_queue) і dead-letter queue (dl_queue). Це можна зробити вручну через інтерфейс RabbitMQ або програмно.
Налаштування черг через Python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Налаштування DLQ (Dead-letter queue)
channel.queue_declare(queue='dl_queue')
# Основна черга з вказівкою DLQ
channel.queue_declare(
queue='my_queue',
arguments={
'x-dead-letter-exchange': '', # Exchange для DLQ
'x-dead-letter-routing-key': 'dl_queue' # Routing key для DLQ
}
)
print("Черги створено!")
connection.close()
3. А що далі?
Тепер, якщо повідомлення не буде оброблене в my_queue (наприклад, консьюмер не підтвердить його обробку), воно автоматично потрапить в dl_queue.
Приклад використання DLQ в Celery
Переходимо до Celery. Налаштуємо задачу, яка іноді навмисно "падає" (бо помилки неминучі, як залежність програміста від кави).
- Базове налаштування Celery
Якщо ви ще не налаштували Celery, ось приклад мінімальної конфігурації:
from celery import Celery app = Celery('tasks', broker='pyamqp://guest@localhost//') # RabbitMQ як broker - Задача, яка навмисно падає
Додамо задачу, яка з ймовірністю 50% буде кидати виняток:
import random @app.task(bind=True) def unreliable_task(self, x, y): if random.random() < 0.5: # Ймовірність помилки raise RuntimeError("Випадкова помилка!") return x + y - Налаштування DLQ в Celery
Celery автоматично використовує DLQ RabbitMQ, якщо повідомлення не оброблене. Щоб переконатись у цьому, додамо задачу в чергу:
result = unreliable_task.apply_async((2, 3), queue='my_queue')Запустіть worker Celery:
celery -A tasks worker --loglevel=infoЯкщо задача кине виняток, повідомлення можна знайти в
dl_queue.
Повторна обробка недоставлених повідомлень
Тепер зробимо крок уперед і реалізуємо логіку повторної обробки повідомлень з DLQ.
- "Читання" повідомлень з DLQ
Ми підключимось до
dl_queueі обробимо повідомлення:import pika def process_dead_letters(): connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # Підключаємось до DLQ channel.queue_declare(queue='dl_queue') # Обробка повідомлень def callback(ch, method, properties, body): print(f"Повторна обробка: {body.decode()}") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(queue='dl_queue', on_message_callback=callback) print("Чекаємо повідомлень з DLQ...") channel.start_consuming() if __name__ == "__main__": process_dead_letters() - Повторна відправка задач в основну чергу
Тепер, після обробки повідомлення з DLQ, ми можемо повертати його в основну чергу:
def callback(ch, method, properties, body): print(f"Повторна обробка: {body.decode()}") # Після обробки - повернути в основну чергу ch.basic_publish(exchange='', routing_key='my_queue', body=body) # Підтверджуємо обробку поточного повідомлення ch.basic_ack(delivery_tag=method.delivery_tag)Цей код читає повідомлення з
dl_queue, повторно надсилає їх вmy_queueі підтверджує обробку.
Практична реалізація системи DLQ
Тепер уявімо, як це може використовуватись у реальному житті. Наприклад, в e-commerce проєкті:
- Задача: відправка email з підтвердженням замовлення.
- Якщо задача "впаде" при відправці (наприклад, через мережеву помилку), вона потрапить в DLQ.
- Через певний час система автоматично витягує повідомлення з DLQ і намагається ще раз відправити email.
Нотатки і типові помилки
- Не забувайте підтверджувати обробку повідомлень. Якщо не викликати
basic_ack, повідомлення застрягнуть у черзі. - Налаштовуйте час життя повідомлень в DLQ. Використовуйте аргумент
x-message-ttl, щоб повідомлення не зберігались вічно. - Уникайте "зациклювання". Якщо повідомлення не вдається обробити і після повторної відправки, додайте логіку, яка помістить його в архівну чергу або передасть адміністратору.
Висновок (жарт, його нема!)
Ми налаштували Dead-letter queue, навчилися обробляти недоставлені повідомлення і навіть реалізували повторну обробку! Цей підхід не тільки підвищує надійність системи, але й доводить, що навіть програми можуть виправити свої помилки. Попереду ще більше захопливої практики, тож продовжуйте розробляти!
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ