JavaRush /Курсы /Модуль 4: FastAPI /Асинхронная работа с очередями сообщений в FastAPI

Асинхронная работа с очередями сообщений в FastAPI

Модуль 4: FastAPI
12 уровень , 5 лекция
Открыта

Создание структуры FastAPI-проекта

Для начала установим aio-pika:

pip install aio-pika

Убедитесь, что RabbitMQ уже установлен и запущен. Если вы пропустили этот этап, вернитесь к лекции 63, чтобы настроить сервер.

Теперь создадим базовый проект FastAPI. Наша структура будет выглядеть так:

fastapi_rabbitmq/
├── app/
│   ├── main.py  # Главная точка входа
│   ├── producer.py  # Асинхронный продюсер
│   ├── consumer.py  # Асинхронный консьюмер
│   ├── config.py  # Конфигурация приложения
├── Dockerfile  # В будущем добавим поддержку Docker

Шаг 1: настройка конфигурации

Начнем с файла config.py для подключения RabbitMQ:


# app/config.py

import os

RABBITMQ_URL = os.getenv("RABBITMQ_URL", "amqp://guest:guest@localhost/")
QUEUE_NAME = "example_queue"

Мы указали URL RabbitMQ и название очереди, которую будем использовать.

Шаг 2: создание продюсера

Теперь создадим продюсера для отправки сообщений. Это будет небольшой модуль producer.py:


# app/producer.py

import aio_pika
from app.config import RABBITMQ_URL, QUEUE_NAME


async def send_message(message: str):
    # Устанавливаем соединение с RabbitMQ
    connection = await aio_pika.connect_robust(RABBITMQ_URL)
    async with connection:
        # Создаем канал
        channel = await connection.channel()

        # Декларация очереди
        queue = await channel.declare_queue(QUEUE_NAME, durable=True)

        # Отправляем сообщение в очередь
        await channel.default_exchange.publish(
            aio_pika.Message(body=message.encode()),
            routing_key=queue.name,
        )
        print(f"Сообщение отправлено: {message}")

Этот код делает следующее:

  1. Устанавливает асинхронное соединение с RabbitMQ.
  2. Создает канал связи и декларацию очереди.
  3. Отправляет сообщение в указанную очередь.

Шаг 3: создание консьюмера

Теперь перейдем к потребителю, который будет обрабатывать сообщения из очереди:


# app/consumer.py

import aio_pika
from app.config import RABBITMQ_URL, QUEUE_NAME


async def consume_messages():
    # Устанавливаем соединение с RabbitMQ
    connection = await aio_pika.connect_robust(RABBITMQ_URL)
    async with connection:
        # Создаем канал
        channel = await connection.channel()

        # Декларация очереди
        queue = await channel.declare_queue(QUEUE_NAME, durable=True)

        # Обработчик сообщений
        async def on_message(message: aio_pika.abc.AbstractIncomingMessage):
            async with message.process():
                print(f"Сообщение получено: {message.body.decode()}")

        # Подписываемся на очередь
        await queue.consume(on_message)
        print("Консьюмер слушает очередь...")

        # Удерживаем подключение открытым
        await connection.connected.wait()

Этот скрипт:

  1. Подключается к RabbitMQ.
  2. Создает очередь и подписывается на нее.
  3. Обрабатывает поступающие сообщения асинхронно.

Шаг 4: интеграция с FastAPI

Мы связали продюсера и консьюмера с RabbitMQ, теперь соединим это с FastAPI. Начнем с основного файла main.py:


# app/main.py

from fastapi import FastAPI, BackgroundTasks
from app.producer import send_message
from app.consumer import consume_messages
import asyncio

app = FastAPI()


@app.on_event("startup")
async def startup_event():
    # Запускаем консьюмера в фоне при старте приложения
    asyncio.create_task(consume_messages())


@app.post("/send/")
async def send_to_queue(content: str, background_tasks: BackgroundTasks):
    # Отправляем сообщение в очередь
    background_tasks.add_task(send_message, content)
    return {"status": "Сообщение отправлено!"}

Здесь:

  1. При запуске приложения мы запускаем консьюмера в фоне.
  2. API /send/ позволяет отправлять сообщения в очередь.

Запуск сервиса

Запустите ваш FastAPI-приложение:

uvicorn app.main:app --reload

Теперь вы можете:

  • Отправлять сообщения с помощью API:
    curl -X POST "http://127.0.0.1:8000/send/" -H "Content-Type: application/json" -d '"Привет, RabbitMQ!"'
    
  • Наблюдать, как консьюмер принимает и обрабатывает их.

Асинхронность: зачем она нужна и что может пойти не так

Представьте современную логистику: на складе приёмщик принимает коробки, сканирует, отправляет на конвейер — и берётся за следующую. Он не ждёт, пока грузовик приедет, чтобы забрать первую коробку. Конвейер — это наша очередь сообщений, а грузовики — это консьюмеры, которые разбирают задания, когда у них есть ресурсы.

Асинхронное взаимодействие позволяет системе быть гибкой и не зависеть от того, насколько быстро кто-то на другом конце готов принять задачу. Задачи «едут» по очереди, разгружаются постепенно и без пробок, а вы в это время спокойно занимаетесь другими вещами. Это особенно важно, когда задач много и время критично.

Но при всей этой красоте нужно помнить о подводных камнях. Надёжное подключение к RabbitMQ — мастхэв, особенно в проде. Используйте aio-pika.connect_robust, чтобы приложение могло переподключиться при сбоях. Также не забывайте отправлять подтверждение (ack) после обработки сообщений — иначе RabbitMQ подумает, что задача не выполнена, и отправит её заново. И да, закрывайте соединения аккуратно — это не про сантехнику, а про то, чтобы завершение работы было чистым и без сюрпризов.

В этой лекции мы создали базовую архитектуру для асинхронного взаимодействия FastAPI и RabbitMQ с помощью библиотеки aio-pika. Это мощная связка для построения современных микросервисных систем. Ну а в следующей лекции мы погрузимся глубже в настройку постоянных (durable) очередей.

1
Задача
Модуль 4: FastAPI, 12 уровень, 5 лекция
Недоступна
Создание базового асинхронного продюсера
Создание базового асинхронного продюсера
1
Задача
Модуль 4: FastAPI, 12 уровень, 5 лекция
Недоступна
Асинхронный консьюмер
Асинхронный консьюмер
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ