Начнем с RabbitMQ, который, как добрый почтовый голубь, доставляет сообщения вашему потребителю (consumer). Но что делать, если голубь устал и выронил письмо?
Подтверждения сообщений (Acknowledgments)
RabbitMQ позволяет установить механизмы подтверждений, благодаря которым вы не потеряете сообщения, даже если потребитель решит устроить себе отпуск:
- Подтверждение сообщения вручную: потребитель явно сообщает RabbitMQ, что обработал сообщение успешно. Это снижает вероятность потери данных.
- Отказ сообщения (Reject): если задача не выполнена, RabbitMQ может либо повторно отправить сообщение другому потребителю, либо переместить его в специальную очередь.
Пример настройки подтверждений:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем очередь
channel.queue_declare(queue='example_queue', durable=True)
# Функция обработки сообщений
def callback(ch, method, properties, body):
print(f"Получено сообщение: {body}")
try:
# Здесь обрабатываем сообщение
do_processing(body)
# Подтверждаем, что обработка завершена успешно
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Ошибка: {e}")
# Отказываться от сообщения
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
channel.basic_consume(
queue='example_queue',
on_message_callback=callback
)
print('Ожидание сообщений...')
channel.start_consuming()
Здесь, если обработка сообщения завершилась неудачно, мы явно сообщаем RabbitMQ, что сообщение не обработано, чтобы оно либо ушло в Dead-letter queue, либо просто отбросилось.
Dead-letter Exchange (DLX)
Для сообщений, которые невозможно доставить или обработать, RabbitMQ поддерживает Dead-letter Exchange. Это, по сути, "кладбище" сообщений, где они ждут повторного использования или анализа.
Настройка DLX:
- Создаем exchange для "мертвых" сообщений.
- Привязываем его к новой очереди.
- Настраиваем исходную очередь на использование DLX.
Пример:
# Создаем "кладбище" для сообщений
rabbitmqadmin declare exchange name=dlx_exchange type=direct
rabbitmqadmin declare queue name=dlx_queue durable=true
rabbitmqadmin declare binding source=dlx_exchange destination=dlx_queue
# Настраиваем исходную очередь на использование DLX
rabbitmqadmin declare queue name=example_queue durable=true arguments='{"x-dead-letter-exchange":"dlx_exchange"}'
Теперь, если сообщение не будет обработано, оно автоматически попадет в dlx_queue.
Обработка ошибок в Celery
Теперь перейдем к Celery. Если RabbitMQ — это почтальон, то Celery — это мастер на все руки, который выполняет задачи (и иногда ошибается). Здесь мы поговорим о перезапусках задач (retry) и других способах обработки ошибок.
Модели RetryPolicy для повторного выполнения задач
Celery поддерживает встроенные механизмы перезапуска задач. Например, если задача не выполнилась из-за временной ошибки (например, сеть была недоступна), вы можете попробовать выполнить её снова через некоторое время.
Пример простого retry:
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task(bind=True, default_retry_delay=5 * 60, max_retries=3) # Повтор через 5 минут, максимум 3 раза
def my_task(self, arg):
try:
# Выполняем задачу
do_something(arg)
except Exception as exc:
# Если ошибка, пробуем ещё раз
raise self.retry(exc=exc)
Особенности конфигурации retry:
default_retry_delay: время между попытками.max_retries: максимальное количество попыток.- Используйте
self.retryвместо стандартногоtry-except, чтобы Celery автоматически обработал повтор.
Таймауты задач
Вы также можете настроить максимальное время выполнения задачи. Например, если задача "застряла", её выполнение должно автоматически прерваться.
Пример:
@app.task(time_limit=30) # Задача будет принудительно завершена через 30 секунд
def long_running_task(arg):
# Ваша длительная задача
do_some_long_work(arg)
Как Celery и RabbitMQ работают вместе для обработки ошибок
Чтобы связать Celery и RabbitMQ в одну эффективную систему, вам нужно настроить:
- Подтверждения от Celery: Celery автоматически отправляет подтверждения в RabbitMQ о выполнении задач.
- Использование DLX: свяжите RabbitMQ и Celery так, чтобы неудачные задачи отправлялись в Dead-letter queue.
Пример:
В конфигурации Celery укажите параметр:
app.conf.task_acks_late = True # Подтверждение только после успешного выполнения задачи
app.conf.task_reject_on_worker_lost = True # Передача задачи обратно в очередь при сбое воркера
Теперь, если воркер "умер" во время выполнения задачи, RabbitMQ вернёт её обратно в очередь или отправит в DLX.
Практический пример: восстановление задач после ошибок
Давайте соберем воедино:
- Настраиваем RabbitMQ для работы с DLX.
- Создаём задачу Celery с retry и логированием ошибок.
- Добавляем таймауты на задачи.
# Конфигурируем Celery
app.conf.update(
task_acks_late=True,
task_reject_on_worker_lost=True,
worker_disable_rate_limits=True
)
# Настраиваем задачу с обработкой ошибок
@app.task(bind=True, default_retry_delay=10, max_retries=5, time_limit=30)
def process_data(self, data):
try:
# Логика задачи
result = do_important_work(data)
return result
except ConnectionError as exc:
self.retry(exc=exc) # Пробуем снова при сетевой ошибке
except Exception as exc:
# Логгируем другие ошибки
logger.error(f"Ошибка обработки: {exc}")
raise
Если задача не смогла выполниться 5 раз, она попадёт в DLX, настроенный ранее в RabbitMQ.
Важные моменты
- Настройка подтверждений и DLX в RabbitMQ помогает избежать потери сообщений.
- Использование retry и таймаутов в Celery делает систему более устойчивой.
- Правильное логирование всех ошибок — ваш лучший друг. Используйте внешние инструменты, такие как Sentry, для мониторинга.
Ну что, теперь ошибки — не враги, а просто часть вашей работы. Как говорил великий Боб: "Эта ошибка не моя, я её просто наследую!". Проверяйте свой код, настраивайте обработку ошибок, и пусть ваш асинхронный код работает как часы (но не падает!).
Рекомендуемая документация:
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ