JavaRush /Курси /Модуль 4: FastAPI /Приклад комплексної системи обробки помилок та відновленн...

Приклад комплексної системи обробки помилок та відновлення задач

Модуль 4: FastAPI
Рівень 15 , Лекція 9
Відкрита

Ласкаво просимо на останній, найважливіший акорд розділу про обробку помилок і відновлення задач в асинхронних системах.

Сьогодні ми зберемо все, що вивчали на попередніх лекціях, в єдину систему — як програміст з Lego, тільки веселіше.

У цій лекції ми не будемо вивчати нові механізми, але докладно попрацюємо з їхньою композицією і перенесемо теорію в реальну практику.

Комплексна система управління помилками включає в себе такі елементи:

  1. Celery: платформа для управління задачами.
  2. RabbitMQ: черга повідомлень для передачі даних між продюсерами і консьюмерами.
  3. Dead-letter Queue (DLQ): місце для повідомлень, які були приречені на провал.
  4. Логування: повний трекінг задач і їхніх статусів.
  5. Моніторинг: відстеження продуктивності і помилок в реальному часі.
  6. Механізм перезапуску задач: надсилає провалені задачі на повторне виконання (якщо це виправдано).

Схема роботи


+-----------+       +-----------+       +-----------+       +-----------+
| Producer  | ----> | RabbitMQ  | ----> | Celery    | ----> | Monitoring |
| (FastAPI) |       |  Queue    |       | Workers   |       |  System    |
+-----------+       +-----------+       +-----------+       +-----------+
                        |
                        v
                 +-------------+
                 | Dead Letter |
                 |    Queue    |
                 +-------------+
                      |
                      v
            +-------------------+
            | Retry Mechanism   |
            +-------------------+

Демонстрація: реальна комплексна система

Отже, створимо систему для обробки замовлень в інтернет‑магазині. Система включатиме:

  • Чергу RabbitMQ для обробки подій (наприклад, "створити замовлення").
  • Celery для фоновго виконання задач.
  • Логи, моніторинг і обробку помилок.

Крок 1: Ініціалізація проєкту.

Створимо проєкт з FastAPI, підключимо Celery і RabbitMQ.


# main.py — ініціалізація FastAPI застосунку
from fastapi import FastAPI
from celery import Celery

app = FastAPI()

# Налаштування Celery
celery_app = Celery(
    "tasks",
    broker="pyamqp://guest@localhost//",
    backend="rpc://"
)

@app.post("/create-order/")
async def create_order(order_id: int):
    """API для створення замовлення"""
    celery_app.send_task("tasks.process_order", args=[order_id])
    return {"status": "Order received", "order_id": order_id}

Крок 2: Створення задач Celery.

Додамо задачу, яка може періодично падати, симулюючи реальний світ.


# tasks.py — задачі Celery
import random
from celery import Celery

celery_app = Celery(
    "tasks",
    broker="pyamqp://guest@localhost//",
    backend="rpc://"
)

@celery_app.task(bind=True, max_retries=5, default_retry_delay=10)
def process_order(self, order_id):
    # Імітація випадкової помилки
    if random.choice([True, False]):
        raise Exception(f"Error processing order {order_id}")
    print(f"Order {order_id} processed successfully")

Крок 3: Налаштування Dead-letter Queue.

Налаштуємо DLQ для RabbitMQ. Відкриваємо rabbitmq.conf:


# RabbitMQ.conf
dead-letter-exchange=dlx_exchange

Створюємо нову чергу "failed-orders" для провалених задач.

Крок 4: Логування помилок.

Додамо логування. Для простоти використовуємо бібліотеку logging.


# tasks.py — додавання логів
import logging

# Налаштуємо логер
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def process_order(self, order_id):
    try:
        if random.choice([True, False]):
            raise Exception(f"Error processing order {order_id}")
        logger.info(f"Order {order_id} processed successfully")
    except Exception as e:
        logger.error(f"Processing failed for Order {order_id}: {e}")
        raise self.retry(exc=e)

Крок 5: Моніторинг задач.

Додаємо підтримку моніторингу через Celery Flower. Встановіть Flower:


pip install flower

Запустіть Flower:


celery -A tasks flower --port=5555

Тепер перейдіть у браузер на http://localhost:5555, щоб побачити статистику задач.


Повторне виконання задач

Тут у справу вступає механізм retry. Якщо задача потрапляє в DLQ, її можна повторно обробити. Наприклад:


# tasks.py — повторна обробка задач з DLQ

@celery_app.task()
def retry_failed_task(order_id):
    logger.info(f"Retrying order {order_id}")
    return process_order(order_id)

Щоб автоматично діставати дані з DLQ, можна написати невеликий скрипт:


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

channel.queue_declare(queue="failed-orders")

def retry_callback(ch, method, properties, body):
    order_id = int(body)
    retry_failed_task.delay(order_id)

channel.basic_consume("failed-orders", on_message_callback=retry_callback, auto_ack=True)
channel.start_consuming()

Реальна архітектура в дії

Тепер у нас є:

  1. API, яке надсилає задачі в чергу.
  2. Задачі Celery, які обробляють замовлення, автоматично роблять retry у випадку помилок і логують результати.
  3. Dead-letter Queue, де осідають "грішні" повідомлення.
  4. Система моніторингу Flower, яка слідкує за виконанням задач.

Що робити в продакшені?

  • Для логів використовуйте системи на кшталт Sentry.
  • Налаштуйте alert-и: повідомлення через Slack або Email при великій кількості помилок.
  • Не забувайте про стійкість: використовуйте Durable черги та резервні копії.

Отак з маленьких кубиків — черг, логів, моніторингу і retry — ми побудували систему, яка вміє дбати про дані, обробляти помилки і продовжувати працювати, навіть якщо щось іде не так.

3
Опитування
Управління затримками та таймаутами при роботі з чергами, рівень 15, лекція 9
Недоступний
Управління затримками та таймаутами при роботі з чергами
Управління затримками та таймаутами при роботі з чергами
Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ