JavaRush /Курси /Модуль 4: FastAPI /Як обробляти помилки в RabbitMQ і Celery

Як обробляти помилки в RabbitMQ і Celery

Модуль 4: FastAPI
Рівень 15 , Лекція 1
Відкрита

Почнемо з RabbitMQ, який, як добрий поштовий голуб, доставляє повідомлення вашому consumer. Але що робити, якщо голуб втомився і впустив лист?

Підтвердження повідомлень (Acknowledgments)

RabbitMQ дозволяє виставити механізми підтверджень, завдяки яким ви не втратите повідомлення, навіть якщо consumer вирішить піти у відпустку:

  • Підтвердження повідомлення вручну: consumer явно повідомляє RabbitMQ, що обробив повідомлення успішно. Це зменшує ймовірність втрати даних.
  • Відмова від повідомлення (Reject): якщо задача не виконана, RabbitMQ може або повторно відправити повідомлення іншому consumer, або перемістити його у спеціальну чергу.

Приклад налаштування підтверджень:


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Оголошуємо чергу
channel.queue_declare(queue='example_queue', durable=True)

# Функція обробки повідомлень
def callback(ch, method, properties, body):
    print(f"Отримано повідомлення: {body}")
    try:
        # Тут обробляємо повідомлення
        do_processing(body) 
        # Підтверджуємо, що обробка завершена успішно
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Помилка: {e}")
        # Відмовляємося від повідомлення
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

channel.basic_consume(
    queue='example_queue', 
    on_message_callback=callback
)

print('Очікування повідомлень...')
channel.start_consuming()

Тут, якщо обробка повідомлення завершилася невдало, ми явно повідомляємо RabbitMQ, що повідомлення не оброблене, щоб воно або пішло в Dead-letter queue, або просто відкинулося.

Dead-letter Exchange (DLX)

Для повідомлень, які неможливо доставити або обробити, RabbitMQ підтримує Dead-letter Exchange. Це, по суті, "кладовище" повідомлень, де вони чекають повторного використання або аналізу.

Налаштування DLX:

  1. Створюємо exchange для "мертвих" повідомлень.
  2. Прив'язуємо його до нової черги.
  3. Налаштовуємо вихідну чергу на використання DLX.

Приклад:


# Створюємо "кладовище" для повідомлень
rabbitmqadmin declare exchange name=dlx_exchange type=direct
rabbitmqadmin declare queue name=dlx_queue durable=true
rabbitmqadmin declare binding source=dlx_exchange destination=dlx_queue

# Налаштовуємо вихідну чергу на використання DLX
rabbitmqadmin declare queue name=example_queue durable=true arguments='{"x-dead-letter-exchange":"dlx_exchange"}'

Тепер, якщо повідомлення не буде оброблене, воно автоматично потрапить у dlx_queue.


Обробка помилок в Celery

Тепер перейдемо до Celery. Якщо RabbitMQ — це листоноша, то Celery — це майстер на всі руки, який виконує задачі (і іноді помиляється). Тут поговоримо про перезапуски задач (retry) і інші способи обробки помилок.

Моделі RetryPolicy для повторного виконання задач

Celery підтримує вбудовані механізми перезапуску задач. Наприклад, якщо задача не виконалася через тимчасову помилку (наприклад, мережа була недоступна), ви можете спробувати виконати її знову через деякий час.

Приклад простого retry:


from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task(bind=True, default_retry_delay=5 * 60, max_retries=3)  # Повтор через 5 хвилин, максимум 3 рази
def my_task(self, arg):
    try:
        # Виконуємо задачу
        do_something(arg)
    except Exception as exc:
        # Якщо помилка, пробуємо ще раз
        raise self.retry(exc=exc)

Особливості конфігурації retry:

  • default_retry_delay: час між спробами.
  • max_retries: максимальна кількість спроб.
  • Використовуйте self.retry замість стандартного try-except, щоб Celery автоматично обробив повтор.

Таймаути задач

Ви також можете налаштувати максимальний час виконання задачі. Наприклад, якщо задача "застрягла", її виконання має автоматично перерватися.

Приклад:


@app.task(time_limit=30)  # Задача буде примусово завершена через 30 секунд
def long_running_task(arg):
    # Ваша довга задача
    do_some_long_work(arg)

Як Celery і RabbitMQ працюють разом для обробки помилок

Щоб зв'язати Celery і RabbitMQ в одну ефективну систему, треба налаштувати:

  1. Підтвердження від Celery: Celery автоматично відправляє підтвердження в RabbitMQ про виконання задач.
  2. Використання DLX: зв'яжіть RabbitMQ і Celery так, щоб невдалі задачі відправлялися в Dead-letter queue.

Приклад:

У конфігурації Celery вкажіть параметр:


app.conf.task_acks_late = True  # Підтвердження тільки після успішного виконання задачі
app.conf.task_reject_on_worker_lost = True  # Повернення задачі в чергу при збої worker-а

Тепер, якщо worker "помер" під час виконання задачі, RabbitMQ поверне її назад у чергу або відправить в DLX.


Практичний приклад: відновлення задач після помилок

Давайте підсумуємо:

  1. Налаштовуємо RabbitMQ для роботи з DLX.
  2. Створюємо задачу Celery з retry і логуванням помилок.
  3. Додаємо таймаути для задач.

# Конфігуруємо Celery
app.conf.update(
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    worker_disable_rate_limits=True
)

# Налаштовуємо задачу з обробкою помилок
@app.task(bind=True, default_retry_delay=10, max_retries=5, time_limit=30)
def process_data(self, data):
    try:
        # Логіка задачі
        result = do_important_work(data)
        return result
    except ConnectionError as exc:
        self.retry(exc=exc)  # Пробуємо ще при мережевій помилці
    except Exception as exc:
        # Логуємо інші помилки
        logger.error(f"Помилка обробки: {exc}")
        raise

Якщо задача не змогла виконатися 5 разів, вона потрапить в DLX, налаштований раніше в RabbitMQ.


Важливі моменти

  • Налаштування підтверджень і DLX в RabbitMQ допомагає уникнути втрати повідомлень.
  • Використання retry і таймаутів в Celery робить систему більш стійкою.
  • Правильне логування всіх помилок — ваш найкращий друг. Використовуйте зовнішні інструменти, такі як Sentry, для моніторингу.

Ну що, тепер помилки — не вороги, а просто частина вашої роботи. Як казав великий Боб: "Ця помилка не моя, я її просто наслідую!". Перевіряйте свій код, налаштовуйте обробку помилок, і нехай ваш асинхронний код працює як годинник (але не падає!).


Рекомендована документація:

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ