JavaRush /Курси /Модуль 4: FastAPI /Асинхронна робота з чергами повідомлень у FastAPI

Асинхронна робота з чергами повідомлень у FastAPI

Модуль 4: FastAPI
Рівень 12 , Лекція 5
Відкрита

Створення структури FastAPI-проєкту

Для початку встановимо aio-pika:

pip install aio-pika

Переконайтеся, що RabbitMQ вже встановлений і запущений. Якщо ви пропустили цей етап, поверніться до лекції 63, щоб налаштувати сервер.

Тепер створимо базовий проєкт FastAPI. Наша структура буде виглядати так:

fastapi_rabbitmq/
├── app/
│   ├── main.py  # Головна точка входу
│   ├── producer.py  # Асинхронний продюсер
│   ├── consumer.py  # Асинхронний консьюмер
│   ├── config.py  # Конфігурація додатку
├── Dockerfile  # У майбутньому додамо підтримку Docker

Крок 1: налаштування конфігурації

Почнемо з файлу config.py для підключення RabbitMQ:


# app/config.py

import os

RABBITMQ_URL = os.getenv("RABBITMQ_URL", "amqp://guest:guest@localhost/")
QUEUE_NAME = "example_queue"

Ми вказали URL RabbitMQ і назву черги, яку будемо використовувати.

Крок 2: створення продюсера

Тепер створимо продюсера для відправки повідомлень. Це буде невеликий модуль producer.py:


# app/producer.py

import aio_pika
from app.config import RABBITMQ_URL, QUEUE_NAME


async def send_message(message: str):
    # Встановлюємо з'єднання з RabbitMQ
    connection = await aio_pika.connect_robust(RABBITMQ_URL)
    async with connection:
        # Створюємо канал
        channel = await connection.channel()

        # Декларація черги
        queue = await channel.declare_queue(QUEUE_NAME, durable=True)

        # Відправляємо повідомлення в чергу
        await channel.default_exchange.publish(
            aio_pika.Message(body=message.encode()),
            routing_key=queue.name,
        )
        print(f"Повідомлення відправлено: {message}")

Цей код робить наступне:

  1. Встановлює асинхронне з'єднання з RabbitMQ.
  2. Створює канал зв'язку і декларацію черги.
  3. Відправляє повідомлення в вказану чергу.

Крок 3: створення консьюмера

Тепер перейдемо до споживача, який буде обробляти повідомлення з черги:


# app/consumer.py

import aio_pika
from app.config import RABBITMQ_URL, QUEUE_NAME


async def consume_messages():
    # Встановлюємо з'єднання з RabbitMQ
    connection = await aio_pika.connect_robust(RABBITMQ_URL)
    async with connection:
        # Створюємо канал
        channel = await connection.channel()

        # Декларація черги
        queue = await channel.declare_queue(QUEUE_NAME, durable=True)

        # Обробник повідомлень
        async def on_message(message: aio_pika.abc.AbstractIncomingMessage):
            async with message.process():
                print(f"Повідомлення отримано: {message.body.decode()}")

        # Підписуємося на чергу
        await queue.consume(on_message)
        print("Консьюмер слухає чергу...")

        # Утримуємо підключення відкритим
        await connection.connected.wait()

Цей скрипт:

  1. Підключається до RabbitMQ.
  2. Створює чергу і підписується на неї.
  3. Обробляє вхідні повідомлення асинхронно.

Крок 4: інтеграція з FastAPI

Ми зв'язали продюсера і консьюмера з RabbitMQ, тепер поєднаємо це з FastAPI. Почнемо з основного файлу main.py:


# app/main.py

from fastapi import FastAPI, BackgroundTasks
from app.producer import send_message
from app.consumer import consume_messages
import asyncio

app = FastAPI()


@app.on_event("startup")
async def startup_event():
    # Запускаємо консьюмера у фоновому режимі при старті додатку
    asyncio.create_task(consume_messages())


@app.post("/send/")
async def send_to_queue(content: str, background_tasks: BackgroundTasks):
    # Відправляємо повідомлення в чергу
    background_tasks.add_task(send_message, content)
    return {"status": "Повідомлення відправлено!"}

Тут:

  1. Під час запуску додатку ми запускаємо консьюмера у фоні.
  2. API /send/ дозволяє відправляти повідомлення в чергу.

Запуск сервісу

Запустіть ваш FastAPI-додаток:

uvicorn app.main:app --reload

Тепер ви можете:

  • Відправляти повідомлення за допомогою API:
    curl -X POST "http://127.0.0.1:8000/send/" -H "Content-Type: application/json" -d '"Привіт, RabbitMQ!"'
    
  • Спостерігати, як консьюмер приймає і обробляє їх.

Асинхронність: навіщо вона потрібна і що може піти не так

Уявіть сучасну логістику: на складі приймальник приймає коробки, сканує, відправляє на конвеєр — і береться за наступну. Він не чекає, поки приїде вантажівка, щоб забрати першу коробку. Конвеєр — це наша черга повідомлень, а вантажівки — це консьюмери, які розбирають завдання, коли в них є ресурси.

Асинхронна взаємодія дозволяє системі бути гнучкою і не залежати від того, наскільки швидко хтось на іншому кінці готовий прийняти задачу. Завдання «їдуть» по черзі, розвантажуються поступово і без заторів, а ви тим часом спокійно займаєтеся іншими речами. Це особливо важливо, коли завдань багато і час критичний.

Але при всій цій красі треба пам'ятати про підводні камені. Надійне підключення до RabbitMQ — мастхев, особливо в проді. Використовуйте aio-pika.connect_robust, щоб додаток міг перепідключитися при збої. Також не забувайте відправляти підтвердження (ack) після обробки повідомлень — інакше RabbitMQ подумає, що завдання не виконане, і відправить його знову. І так, закривайте з'єднання акуратно — це не про сантехніку, а про те, щоб завершення роботи було чистим і без сюрпризів.

У цій лекції ми створили базову архітектуру для асинхронної взаємодії FastAPI і RabbitMQ за допомогою бібліотеки aio-pika. Це потужне поєднання для побудови сучасних мікросервісних систем. А в наступній лекції ми заглибимось у налаштування durable черг.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ