Ласкаво просимо на останній, найважливіший акорд розділу про обробку помилок і відновлення задач в асинхронних системах.
Сьогодні ми зберемо все, що вивчали на попередніх лекціях, в єдину систему — як програміст з Lego, тільки веселіше.
У цій лекції ми не будемо вивчати нові механізми, але докладно попрацюємо з їхньою композицією і перенесемо теорію в реальну практику.
Комплексна система управління помилками включає в себе такі елементи:
- Celery: платформа для управління задачами.
- RabbitMQ: черга повідомлень для передачі даних між продюсерами і консьюмерами.
- Dead-letter Queue (DLQ): місце для повідомлень, які були приречені на провал.
- Логування: повний трекінг задач і їхніх статусів.
- Моніторинг: відстеження продуктивності і помилок в реальному часі.
- Механізм перезапуску задач: надсилає провалені задачі на повторне виконання (якщо це виправдано).
Схема роботи
+-----------+ +-----------+ +-----------+ +-----------+
| Producer | ----> | RabbitMQ | ----> | Celery | ----> | Monitoring |
| (FastAPI) | | Queue | | Workers | | System |
+-----------+ +-----------+ +-----------+ +-----------+
|
v
+-------------+
| Dead Letter |
| Queue |
+-------------+
|
v
+-------------------+
| Retry Mechanism |
+-------------------+
Демонстрація: реальна комплексна система
Отже, створимо систему для обробки замовлень в інтернет‑магазині. Система включатиме:
- Чергу RabbitMQ для обробки подій (наприклад, "створити замовлення").
- Celery для фоновго виконання задач.
- Логи, моніторинг і обробку помилок.
Крок 1: Ініціалізація проєкту.
Створимо проєкт з FastAPI, підключимо Celery і RabbitMQ.
# main.py — ініціалізація FastAPI застосунку
from fastapi import FastAPI
from celery import Celery
app = FastAPI()
# Налаштування Celery
celery_app = Celery(
"tasks",
broker="pyamqp://guest@localhost//",
backend="rpc://"
)
@app.post("/create-order/")
async def create_order(order_id: int):
"""API для створення замовлення"""
celery_app.send_task("tasks.process_order", args=[order_id])
return {"status": "Order received", "order_id": order_id}
Крок 2: Створення задач Celery.
Додамо задачу, яка може періодично падати, симулюючи реальний світ.
# tasks.py — задачі Celery
import random
from celery import Celery
celery_app = Celery(
"tasks",
broker="pyamqp://guest@localhost//",
backend="rpc://"
)
@celery_app.task(bind=True, max_retries=5, default_retry_delay=10)
def process_order(self, order_id):
# Імітація випадкової помилки
if random.choice([True, False]):
raise Exception(f"Error processing order {order_id}")
print(f"Order {order_id} processed successfully")
Крок 3: Налаштування Dead-letter Queue.
Налаштуємо DLQ для RabbitMQ. Відкриваємо rabbitmq.conf:
# RabbitMQ.conf
dead-letter-exchange=dlx_exchange
Створюємо нову чергу "failed-orders" для провалених задач.
Крок 4: Логування помилок.
Додамо логування. Для простоти використовуємо бібліотеку logging.
# tasks.py — додавання логів
import logging
# Налаштуємо логер
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process_order(self, order_id):
try:
if random.choice([True, False]):
raise Exception(f"Error processing order {order_id}")
logger.info(f"Order {order_id} processed successfully")
except Exception as e:
logger.error(f"Processing failed for Order {order_id}: {e}")
raise self.retry(exc=e)
Крок 5: Моніторинг задач.
Додаємо підтримку моніторингу через Celery Flower. Встановіть Flower:
pip install flower
Запустіть Flower:
celery -A tasks flower --port=5555
Тепер перейдіть у браузер на http://localhost:5555, щоб побачити статистику задач.
Повторне виконання задач
Тут у справу вступає механізм retry. Якщо задача потрапляє в DLQ, її можна повторно обробити. Наприклад:
# tasks.py — повторна обробка задач з DLQ
@celery_app.task()
def retry_failed_task(order_id):
logger.info(f"Retrying order {order_id}")
return process_order(order_id)
Щоб автоматично діставати дані з DLQ, можна написати невеликий скрипт:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue="failed-orders")
def retry_callback(ch, method, properties, body):
order_id = int(body)
retry_failed_task.delay(order_id)
channel.basic_consume("failed-orders", on_message_callback=retry_callback, auto_ack=True)
channel.start_consuming()
Реальна архітектура в дії
Тепер у нас є:
- API, яке надсилає задачі в чергу.
- Задачі Celery, які обробляють замовлення, автоматично роблять retry у випадку помилок і логують результати.
- Dead-letter Queue, де осідають "грішні" повідомлення.
- Система моніторингу Flower, яка слідкує за виконанням задач.
Що робити в продакшені?
- Для логів використовуйте системи на кшталт Sentry.
- Налаштуйте alert-и: повідомлення через Slack або Email при великій кількості помилок.
- Не забувайте про стійкість: використовуйте Durable черги та резервні копії.
Отак з маленьких кубиків — черг, логів, моніторингу і retry — ми побудували систему, яка вміє дбати про дані, обробляти помилки і продовжувати працювати, навіть якщо щось іде не так.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ