JavaRush /Курсы /Модуль 4: FastAPI /Пример создания асинхронной системы взаимодействия через ...

Пример создания асинхронной системы взаимодействия через RabbitMQ

Модуль 4: FastAPI
12 уровень , 7 лекция
Открыта

Если вы дошли до этой лекции, то, скорее всего, у вас уже есть основные знания о том, что такое RabbitMQ, как создавать очереди, продюсеров и консьюмеров, и даже как работать с ними асинхронно. Сегодня мы соберём все кусочки пазла вместе, чтобы создать работающую асинхронную систему взаимодействия с использованием FastAPI и RabbitMQ. Настройтесь на практику: будет много кода, веселья и, конечно, немного кроликов. 🐇

Давайте разберёмся, как будет выглядеть наша система. Представьте, что у нас есть приложение, в котором пользователь отправляет запросы через API (например, на загрузку и обработку файла). FastAPI принимает этот запрос и публикует задачу в очередь RabbitMQ. Консьюмер из другой службы забирает эту задачу, обрабатывает её и сохраняет результат.

Вот схема взаимодействия:


Клиент -> FastAPI (продюсер) -> RabbitMQ -> Консьюмер -> Обработка задачи -> Результат

Мы добавим асинхронность, чтобы система могла обрабатывать множество задач параллельно. Ну что, готовы? Погнали писать код! 💻


Настройка проекта

Для начала убедитесь, что у вас установлены:

  • Python 3.10+.
  • RabbitMQ (помните, что в прошлых лекциях мы его уже настроили).
  • Библиотека pika для работы с RabbitMQ (если ещё не установили: pip install pika).
  • FastAPI и Uvicorn (pip install fastapi uvicorn).

Создадим проект с такой структурой:


project/
├── producer/
│   ├── app.py
│   └── producer.py
├── consumer/
│   ├── worker.py
│   └── processor.py
└── requirements.txt

Продюсер: отправка сообщений

Начнём с создания продюсера внутри producer/producer.py. Это будет часть, которая посылает задачи в RabbitMQ.


import pika

def publish_message(message: str):
    """Функция для отправки сообщения в очередь"""
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # Объявляем очередь (создаётся, если ещё нет)
    channel.queue_declare(queue='task_queue', durable=True)
    
    # Отправка сообщения
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # Делает сообщение постоянным
        )
    )
    
    print(f"[x] Отправлено: {message}")
    connection.close()

Теперь добавим API в producer/app.py, чтобы пользователь мог отправлять задачи через HTTP-запросы:


from fastapi import FastAPI
from producer import publish_message

app = FastAPI()

@app.post("/send-task/")
async def send_task(task: str):
    publish_message(task)
    return {"status": "Task sent", "task": task}

Запускаем сервер FastAPI:


uvicorn producer.app:app --reload

Попробуйте отправить запрос на /send-task/ через Postman или Curl:


curl -X POST "http://127.0.0.1:8000/send-task/" -H "Content-Type: application/json" -d '{"task": "Hello, RabbitMQ!"}'

На этом этапе наш сервер уже умеет отправлять задачи в RabbitMQ. 🔥

Консьюмер: обработка сообщений

Переходим к созданию консьюмера в consumer/worker.py. Его задача — забрать сообщение из очереди и обработать.


import pika
import time

def callback(ch, method, properties, body):
    """Функция обработки задачи"""
    print(f"[x] Получено сообщение: {body.decode()}")
    time.sleep(2)  # Сымитируем длительную обработку
    print("[x] Задача выполнена")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # Подтверждение обработки

def start_worker():
    """Запуск воркера"""
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='task_queue', durable=True)

    print("[*] Ожидание сообщений. Для выхода нажмите CTRL+C")
    channel.basic_qos(prefetch_count=1)  # Обработка только 1 задачи одновременно
    channel.basic_consume(queue='task_queue', on_message_callback=callback)

    channel.start_consuming()

if __name__ == "__main__":
    start_worker()

Запускаем воркер:


python consumer/worker.py

Асинхронность и масштабируемость

Теперь мы настроим асинхронность. Добавим несколько воркеров, чтобы обрабатывать задачи параллельно. Просто откройте несколько терминалов и запустите worker.py в каждом из них. RabbitMQ сам распределит задачи между воркерами благодаря настройке basic_qos.

Тестирование системы

  1. Отправьте несколько задач через /send-task/ — каждое сообщение будет поставлено в очередь.
  2. Следите за логами в консоль воркера. Если запущено несколько воркеров, вы заметите, как задачи распределяются между ними. Это так называемый Round-Robin.

🌟 Реальное применение

Где можно использовать такую систему:

  • Обработка изображений или видео.
  • Фоновые расчёты, например, отправка email или SMS.
  • Построение системы уведомлений.
  • И многое другое: в микросервисах этот подход просто незаменим.

Особенности управления

Несколько советов для управления системой:

  1. Используйте постоянные очереди (durable=True), чтобы данные не терялись при сбоях.
  2. Настройте механизм подтверждения обработки сообщений с помощью basic_ack.
  3. Для мониторинга RabbitMQ используйте встроенную панель управления (http://localhost:15672, если вы уже настраивали её ранее).

На этом этапе наша асинхронная система готова. Дальше можно её тестировать, оптимизировать производительность и масштабировать. Ну а если у вас есть идеи для усовершенствования этой системы — дерзайте, миру нужны новые программисты! 🐍✨

1
Задача
Модуль 4: FastAPI, 12 уровень, 7 лекция
Недоступна
Создание продюсера для отправки сообщений
Создание продюсера для отправки сообщений
1
Задача
Модуль 4: FastAPI, 12 уровень, 7 лекция
Недоступна
Создание консьюмера для обработки задач
Создание консьюмера для обработки задач
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ