Создание структуры FastAPI-проекта
Для начала установим aio-pika:
pip install aio-pika
Убедитесь, что RabbitMQ уже установлен и запущен. Если вы пропустили этот этап, вернитесь к лекции 63, чтобы настроить сервер.
Теперь создадим базовый проект FastAPI. Наша структура будет выглядеть так:
fastapi_rabbitmq/
├── app/
│ ├── main.py # Главная точка входа
│ ├── producer.py # Асинхронный продюсер
│ ├── consumer.py # Асинхронный консьюмер
│ ├── config.py # Конфигурация приложения
├── Dockerfile # В будущем добавим поддержку Docker
Шаг 1: настройка конфигурации
Начнем с файла config.py для подключения RabbitMQ:
# app/config.py
import os
RABBITMQ_URL = os.getenv("RABBITMQ_URL", "amqp://guest:guest@localhost/")
QUEUE_NAME = "example_queue"
Мы указали URL RabbitMQ и название очереди, которую будем использовать.
Шаг 2: создание продюсера
Теперь создадим продюсера для отправки сообщений. Это будет небольшой модуль producer.py:
# app/producer.py
import aio_pika
from app.config import RABBITMQ_URL, QUEUE_NAME
async def send_message(message: str):
# Устанавливаем соединение с RabbitMQ
connection = await aio_pika.connect_robust(RABBITMQ_URL)
async with connection:
# Создаем канал
channel = await connection.channel()
# Декларация очереди
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
# Отправляем сообщение в очередь
await channel.default_exchange.publish(
aio_pika.Message(body=message.encode()),
routing_key=queue.name,
)
print(f"Сообщение отправлено: {message}")
Этот код делает следующее:
- Устанавливает асинхронное соединение с RabbitMQ.
- Создает канал связи и декларацию очереди.
- Отправляет сообщение в указанную очередь.
Шаг 3: создание консьюмера
Теперь перейдем к потребителю, который будет обрабатывать сообщения из очереди:
# app/consumer.py
import aio_pika
from app.config import RABBITMQ_URL, QUEUE_NAME
async def consume_messages():
# Устанавливаем соединение с RabbitMQ
connection = await aio_pika.connect_robust(RABBITMQ_URL)
async with connection:
# Создаем канал
channel = await connection.channel()
# Декларация очереди
queue = await channel.declare_queue(QUEUE_NAME, durable=True)
# Обработчик сообщений
async def on_message(message: aio_pika.abc.AbstractIncomingMessage):
async with message.process():
print(f"Сообщение получено: {message.body.decode()}")
# Подписываемся на очередь
await queue.consume(on_message)
print("Консьюмер слушает очередь...")
# Удерживаем подключение открытым
await connection.connected.wait()
Этот скрипт:
- Подключается к RabbitMQ.
- Создает очередь и подписывается на нее.
- Обрабатывает поступающие сообщения асинхронно.
Шаг 4: интеграция с FastAPI
Мы связали продюсера и консьюмера с RabbitMQ, теперь соединим это с FastAPI. Начнем с основного файла main.py:
# app/main.py
from fastapi import FastAPI, BackgroundTasks
from app.producer import send_message
from app.consumer import consume_messages
import asyncio
app = FastAPI()
@app.on_event("startup")
async def startup_event():
# Запускаем консьюмера в фоне при старте приложения
asyncio.create_task(consume_messages())
@app.post("/send/")
async def send_to_queue(content: str, background_tasks: BackgroundTasks):
# Отправляем сообщение в очередь
background_tasks.add_task(send_message, content)
return {"status": "Сообщение отправлено!"}
Здесь:
- При запуске приложения мы запускаем консьюмера в фоне.
- API
/send/позволяет отправлять сообщения в очередь.
Запуск сервиса
Запустите ваш FastAPI-приложение:
uvicorn app.main:app --reload
Теперь вы можете:
- Отправлять сообщения с помощью API:
curl -X POST "http://127.0.0.1:8000/send/" -H "Content-Type: application/json" -d '"Привет, RabbitMQ!"' - Наблюдать, как консьюмер принимает и обрабатывает их.
Асинхронность: зачем она нужна и что может пойти не так
Представьте современную логистику: на складе приёмщик принимает коробки, сканирует, отправляет на конвейер — и берётся за следующую. Он не ждёт, пока грузовик приедет, чтобы забрать первую коробку. Конвейер — это наша очередь сообщений, а грузовики — это консьюмеры, которые разбирают задания, когда у них есть ресурсы.
Асинхронное взаимодействие позволяет системе быть гибкой и не зависеть от того, насколько быстро кто-то на другом конце готов принять задачу. Задачи «едут» по очереди, разгружаются постепенно и без пробок, а вы в это время спокойно занимаетесь другими вещами. Это особенно важно, когда задач много и время критично.
Но при всей этой красоте нужно помнить о подводных камнях. Надёжное подключение к RabbitMQ — мастхэв, особенно в проде. Используйте aio-pika.connect_robust, чтобы приложение могло переподключиться при сбоях. Также не забывайте отправлять подтверждение (ack) после обработки сообщений — иначе RabbitMQ подумает, что задача не выполнена, и отправит её заново. И да, закрывайте соединения аккуратно — это не про сантехнику, а про то, чтобы завершение работы было чистым и без сюрпризов.
В этой лекции мы создали базовую архитектуру для асинхронного взаимодействия FastAPI и RabbitMQ с помощью библиотеки aio-pika. Это мощная связка для построения современных микросервисных систем. Ну а в следующей лекции мы погрузимся глубже в настройку постоянных (durable) очередей.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ