Створення структури FastAPI-проєкту
Для початку встановимо aio-pika:
pip install aio-pika
Переконайтеся, що RabbitMQ вже встановлений і запущений. Якщо ви пропустили цей етап, поверніться до лекції 63, щоб налаштувати сервер.
Тепер створимо базовий проєкт FastAPI. Наша структура буде виглядати так:
fastapi_rabbitmq/
├── app/
│ ├── main.py # Головна точка входу
│ ├── producer.py # Асинхронний продюсер
│ ├── consumer.py # Асинхронний консьюмер
│ ├── config.py # Конфігурація додатку
├── Dockerfile # У майбутньому додамо підтримку Docker
Крок 1: налаштування конфігурації
Почнемо з файлу config.py для підключення RabbitMQ:
# app/config.py
import os
RABBITMQ_URL = os.getenv("RABBITMQ_URL", "amqp://guest:guest@localhost/")
QUEUE_NAME = "example_queue"
Ми вказали URL RabbitMQ і назву черги, яку будемо використовувати.
Крок 2: створення продюсера
Тепер створимо продюсера для відправки повідомлень. Це буде невеликий модуль producer.py:
# app/producer.py
import aio_pika
from app.config import RABBITMQ_URL, QUEUE_NAME
async def send_message(message: str):
# Встановлюємо з'єднання з RabbitMQ
connection = await aio_pika.connect_robust(RABBITMQ_URL)
async with connection:
# Створюємо канал
channel = await connection.channel()
# Декларація черги
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
# Відправляємо повідомлення в чергу
await channel.default_exchange.publish(
aio_pika.Message(body=message.encode()),
routing_key=queue.name,
)
print(f"Повідомлення відправлено: {message}")
Цей код робить наступне:
- Встановлює асинхронне з'єднання з RabbitMQ.
- Створює канал зв'язку і декларацію черги.
- Відправляє повідомлення в вказану чергу.
Крок 3: створення консьюмера
Тепер перейдемо до споживача, який буде обробляти повідомлення з черги:
# app/consumer.py
import aio_pika
from app.config import RABBITMQ_URL, QUEUE_NAME
async def consume_messages():
# Встановлюємо з'єднання з RabbitMQ
connection = await aio_pika.connect_robust(RABBITMQ_URL)
async with connection:
# Створюємо канал
channel = await connection.channel()
# Декларація черги
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
# Обробник повідомлень
async def on_message(message: aio_pika.abc.AbstractIncomingMessage):
async with message.process():
print(f"Повідомлення отримано: {message.body.decode()}")
# Підписуємося на чергу
await queue.consume(on_message)
print("Консьюмер слухає чергу...")
# Утримуємо підключення відкритим
await connection.connected.wait()
Цей скрипт:
- Підключається до RabbitMQ.
- Створює чергу і підписується на неї.
- Обробляє вхідні повідомлення асинхронно.
Крок 4: інтеграція з FastAPI
Ми зв'язали продюсера і консьюмера з RabbitMQ, тепер поєднаємо це з FastAPI. Почнемо з основного файлу main.py:
# app/main.py
from fastapi import FastAPI, BackgroundTasks
from app.producer import send_message
from app.consumer import consume_messages
import asyncio
app = FastAPI()
@app.on_event("startup")
async def startup_event():
# Запускаємо консьюмера у фоновому режимі при старті додатку
asyncio.create_task(consume_messages())
@app.post("/send/")
async def send_to_queue(content: str, background_tasks: BackgroundTasks):
# Відправляємо повідомлення в чергу
background_tasks.add_task(send_message, content)
return {"status": "Повідомлення відправлено!"}
Тут:
- Під час запуску додатку ми запускаємо консьюмера у фоні.
- API
/send/дозволяє відправляти повідомлення в чергу.
Запуск сервісу
Запустіть ваш FastAPI-додаток:
uvicorn app.main:app --reload
Тепер ви можете:
- Відправляти повідомлення за допомогою API:
curl -X POST "http://127.0.0.1:8000/send/" -H "Content-Type: application/json" -d '"Привіт, RabbitMQ!"' - Спостерігати, як консьюмер приймає і обробляє їх.
Асинхронність: навіщо вона потрібна і що може піти не так
Уявіть сучасну логістику: на складі приймальник приймає коробки, сканує, відправляє на конвеєр — і береться за наступну. Він не чекає, поки приїде вантажівка, щоб забрати першу коробку. Конвеєр — це наша черга повідомлень, а вантажівки — це консьюмери, які розбирають завдання, коли в них є ресурси.
Асинхронна взаємодія дозволяє системі бути гнучкою і не залежати від того, наскільки швидко хтось на іншому кінці готовий прийняти задачу. Завдання «їдуть» по черзі, розвантажуються поступово і без заторів, а ви тим часом спокійно займаєтеся іншими речами. Це особливо важливо, коли завдань багато і час критичний.
Але при всій цій красі треба пам'ятати про підводні камені. Надійне підключення до RabbitMQ — мастхев, особливо в проді. Використовуйте aio-pika.connect_robust, щоб додаток міг перепідключитися при збої. Також не забувайте відправляти підтвердження (ack) після обробки повідомлень — інакше RabbitMQ подумає, що завдання не виконане, і відправить його знову. І так, закривайте з'єднання акуратно — це не про сантехніку, а про те, щоб завершення роботи було чистим і без сюрпризів.
У цій лекції ми створили базову архітектуру для асинхронної взаємодії FastAPI і RabbitMQ за допомогою бібліотеки aio-pika. Це потужне поєднання для побудови сучасних мікросервісних систем. А в наступній лекції ми заглибимось у налаштування durable черг.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ