JavaRush /Курси /Модуль 4: FastAPI /Приклад створення асинхронної системи взаємодії через Rab...

Приклад створення асинхронної системи взаємодії через RabbitMQ

Модуль 4: FastAPI
Рівень 12 , Лекція 7
Відкрита

Якщо ти дійшов до цієї лекції, то, найімовірніше, у тебе вже є базові знання про те, що таке RabbitMQ, як створювати черги, продюсерів і консьюмерів, і навіть як працювати з ними асинхронно. Сьогодні ми зберемо всі шматки пазлу разом, щоб зробити робочу асинхронну систему взаємодії з використанням FastAPI і RabbitMQ. Налаштуйся на практику: буде багато коду, фану і, звісно, трохи кроликів. 🐇

Давай розберемося, як виглядатиме наша система. Уяви, що в нас є застосунок, у якому користувач відправляє запити через API (наприклад, для завантаження й обробки файлу). FastAPI приймає цей запит і публікує задачу в чергу RabbitMQ. Консьюмер з іншого сервісу забирає цю задачу, обробляє її і зберігає результат.

Ось схема взаємодії:


Клієнт -> FastAPI (продюсер) -> RabbitMQ -> Консьюмер -> Обробка задачі -> Результат

Додамо асинхронності, щоб система могла обробляти купу задач паралельно. Ну що, готовий? Поїхали писати код! 💻


Налаштування проєкту

Для початку переконайся, що у тебе встановлено:

  • Python 3.10+.
  • RabbitMQ (пам'ятай, що в попередніх лекціях ми його вже налаштовували).
  • Бібліотека pika для роботи з RabbitMQ (якщо ще не встановив: pip install pika).
  • FastAPI і Uvicorn (pip install fastapi uvicorn).

Створимо проєкт з такою структурою:


project/
├── producer/
│   ├── app.py
│   └── producer.py
├── consumer/
│   ├── worker.py
│   └── processor.py
└── requirements.txt

Продюсер: відправка повідомлень

Почнемо зі створення продюсера в producer/producer.py. Це та частина, яка відправляє задачі в RabbitMQ.


import pika

def publish_message(message: str):
    """Функція для відправки повідомлення в чергу"""
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # Оголошуємо чергу (створюється, якщо ще немає)
    channel.queue_declare(queue='task_queue', durable=True)
    
    # Відправка повідомлення
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # Робить повідомлення стійким
        )
    )
    
    print(f"[x] Надіслано: {message}")
    connection.close()

Тепер додамо API в producer/app.py, щоб користувач міг відправляти задачі через HTTP-запити:


from fastapi import FastAPI
from producer import publish_message

app = FastAPI()

@app.post("/send-task/")
async def send_task(task: str):
    publish_message(task)
    return {"status": "Task sent", "task": task}

Запускаємо сервер FastAPI:


uvicorn producer.app:app --reload

Спробуй відправити запит на /send-task/ через Postman або Curl:


curl -X POST "http://127.0.0.1:8000/send-task/" -H "Content-Type: application/json" -d '{"task": "Hello, RabbitMQ!"}'

На цьому етапі наш сервер вже вміє відправляти задачі в RabbitMQ. 🔥

Консьюмер: обробка повідомлень

Переходимо до створення консьюмера в consumer/worker.py. Його завдання — забрати повідомлення з черги і обробити.


import pika
import time

def callback(ch, method, properties, body):
    """Функція обробки задачі"""
    print(f"[x] Отримано повідомлення: {body.decode()}")
    time.sleep(2)  # Замоделюємо тривалу обробку
    print("[x] Завдання виконане")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # Підтвердження обробки

def start_worker():
    """Запуск воркера"""
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='task_queue', durable=True)

    print("[*] Очікую повідомлень. Для виходу натисніть CTRL+C")
    channel.basic_qos(prefetch_count=1)  # Обробка лише 1 задачі одночасно
    channel.basic_consume(queue='task_queue', on_message_callback=callback)

    channel.start_consuming()

if __name__ == "__main__":
    start_worker()

Запускаємо воркер:


python consumer/worker.py

Асинхронність і масштабованість

Тепер налаштуємо асинхронність. Додай кілька воркерів, щоб обробляти задачі паралельно. Просто відкрий кілька терміналів і запусти worker.py в кожному з них. RabbitMQ сам розподілить задачі між воркерами завдяки налаштуванню basic_qos.

Тестування системи

  1. Відправ кілька задач через /send-task/ — кожне повідомлення буде поставлене в чергу.
  2. Слідкуй за логами в консолі воркера. Якщо запущено кілька воркерів, помітиш, як задачі розподіляються між ними. Це так званий Round-Robin.

🌟 Реальне застосування

Де можна використовувати таку систему:

  • Обробка зображень або відео.
  • Фонові розрахунки, наприклад, відправка email або SMS.
  • Побудова системи сповіщень.
  • І багато іншого: у мікросервісах цей підхід просто незамінний.

Особливості управління

Кілька порад для управління системою:

  1. Використовуй стійкі черги (durable=True), щоб дані не губилися при збоях.
  2. Налаштуй механізм підтвердження обробки повідомлень за допомогою basic_ack.
  3. Для моніторингу RabbitMQ використовуй вбудовану панель керування (http://localhost:15672, якщо ти вже її налаштовував раніше).

На цьому наша асинхронна система готова. Далі можна її тестувати, оптимізувати продуктивність і масштабувати. А якщо в тебе є ідеї, як покращити цю систему — дерзай, світу потрібні нові програмісти! 🐍✨

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ