Добро пожаловать на последний, самый важный аккорд главы про обработку ошибок и восстановление задач в асинхронных системах.
Сегодня мы соберем все, что изучали на предыдущих лекциях, в единую систему — как программист из Lego, только веселей.
В этой лекции мы не будем изучать новые механизмы, но вдоволь поработаем с их композицией и перенесем теорию в реальную практику.
Комплексная система управления ошибками включает в себя следующие элементы:
- Celery: платформа для управления задачами.
- RabbitMQ: очередь сообщений для передачи данных между продюсерами и консьюмерами.
- Dead-letter Queue (DLQ): место для сообщений, которые были обречены на провал.
- Логирование: полный трекинг задач и их статусов.
- Мониторинг: отслеживание производительности и ошибок в реальном времени.
- Механизм перезапуска задач: отправляет проваленные задачи на повторное выполнение (если это оправдано).
Схема работы
+-----------+ +-----------+ +-----------+ +-----------+
| Producer | ----> | RabbitMQ | ----> | Celery | ----> | Monitoring |
| (FastAPI) | | Queue | | Workers | | System |
+-----------+ +-----------+ +-----------+ +-----------+
|
v
+-------------+
| Dead Letter |
| Queue |
+-------------+
|
v
+-------------------+
| Retry Mechanism |
+-------------------+
Демонстрация: реальная комплексная система
Итак, мы создадим систему для обработки заказов в интернет-магазине. Система будет включать:
- Очередь RabbitMQ для обработки событий (например, "создать заказ").
- Celery для фонового выполнения задач.
- Логи, мониторинг и обработку ошибок.
Шаг 1: Инициализация проекта.
Создадим проект с FastAPI, подключим Celery и RabbitMQ.
# main.py — инициализация FastAPI приложения
from fastapi import FastAPI
from celery import Celery
app = FastAPI()
# Настройка Celery
celery_app = Celery(
"tasks",
broker="pyamqp://guest@localhost//",
backend="rpc://"
)
@app.post("/create-order/")
async def create_order(order_id: int):
"""API для создания заказа"""
celery_app.send_task("tasks.process_order", args=[order_id])
return {"status": "Order received", "order_id": order_id}
Шаг 2: Создание задач Celery.
Добавим задачу, которая может периодически падать, симулируя реальный мир.
# tasks.py — задачи Celery
import random
from celery import Celery
celery_app = Celery(
"tasks",
broker="pyamqp://guest@localhost//",
backend="rpc://"
)
@celery_app.task(bind=True, max_retries=5, default_retry_delay=10)
def process_order(self, order_id):
# Имитация случайной ошибки
if random.choice([True, False]):
raise Exception(f"Error processing order {order_id}")
print(f"Order {order_id} processed successfully")
Шаг 3: Настройка Dead-letter Queue.
Настроим DLQ для RabbitMQ. Открываем rabbitmq.conf:
# RabbitMQ.conf
dead-letter-exchange=dlx_exchange
Создаем новую очередь "failed-orders" для проваленных задач.
Шаг 4: Логирование ошибок.
Добавим логирование. Для простоты используем библиотеку logging.
# tasks.py — добавление логов
import logging
# Настроим логгер
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process_order(self, order_id):
try:
if random.choice([True, False]):
raise Exception(f"Error processing order {order_id}")
logger.info(f"Order {order_id} processed successfully")
except Exception as e:
logger.error(f"Processing failed for Order {order_id}: {e}")
raise self.retry(exc=e)
Шаг 5: Мониторинг задач.
Добавляем поддержку мониторинга через Celery Flower. Установите Flower:
pip install flower
Запустите Flower:
celery -A tasks flower --port=5555
Теперь перейдите в браузер на http://localhost:5555, чтобы увидеть статистику задач.
Повторное выполнение задач
Здесь в дело вступает механизм retry. Если задача попадает в DLQ, ее можно повторно обработать. Например:
# tasks.py — повторная обработка задач из DLQ
@celery_app.task()
def retry_failed_task(order_id):
logger.info(f"Retrying order {order_id}")
return process_order(order_id)
Чтобы автоматически извлекать данные из DLQ, можно написать небольшой скрипт:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue="failed-orders")
def retry_callback(ch, method, properties, body):
order_id = int(body)
retry_failed_task.delay(order_id)
channel.basic_consume("failed-orders", on_message_callback=retry_callback, auto_ack=True)
channel.start_consuming()
Реальная архитектура в действии
Теперь у нас есть:
- API, которое отправляет задачи в очередь.
- Задачи Celery, которые обрабатывают заказы, автоматически делают retry в случае ошибок и логируют результаты.
- Dead-letter Queue, где оседают "грешные" сообщения.
- Система мониторинга Flower, которая следит за выполнением задач.
Что делать в продакшене?
- Для логов используйте системы вроде Sentry.
- Настройте alert'ы: уведомления через Slack или Email при большом числе ошибок.
- Не забывайте про устойчивость: используйте Durable очереди и резервные копии.
Таким вот образом из маленьких кубиков — очередей, логов, мониторинга и retry — мы построили систему, которая умеет заботиться о данных, обрабатывать ошибки и продолжать работать, даже если что-то идет не так.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ