Если вы дошли до этой лекции, то, скорее всего, у вас уже есть основные знания о том, что такое RabbitMQ, как создавать очереди, продюсеров и консьюмеров, и даже как работать с ними асинхронно. Сегодня мы соберём все кусочки пазла вместе, чтобы создать работающую асинхронную систему взаимодействия с использованием FastAPI и RabbitMQ. Настройтесь на практику: будет много кода, веселья и, конечно, немного кроликов. 🐇
Давайте разберёмся, как будет выглядеть наша система. Представьте, что у нас есть приложение, в котором пользователь отправляет запросы через API (например, на загрузку и обработку файла). FastAPI принимает этот запрос и публикует задачу в очередь RabbitMQ. Консьюмер из другой службы забирает эту задачу, обрабатывает её и сохраняет результат.
Вот схема взаимодействия:
Клиент -> FastAPI (продюсер) -> RabbitMQ -> Консьюмер -> Обработка задачи -> Результат
Мы добавим асинхронность, чтобы система могла обрабатывать множество задач параллельно. Ну что, готовы? Погнали писать код! 💻
Настройка проекта
Для начала убедитесь, что у вас установлены:
- Python 3.10+.
- RabbitMQ (помните, что в прошлых лекциях мы его уже настроили).
- Библиотека
pikaдля работы с RabbitMQ (если ещё не установили:pip install pika). - FastAPI и Uvicorn (
pip install fastapi uvicorn).
Создадим проект с такой структурой:
project/
├── producer/
│ ├── app.py
│ └── producer.py
├── consumer/
│ ├── worker.py
│ └── processor.py
└── requirements.txt
Продюсер: отправка сообщений
Начнём с создания продюсера внутри producer/producer.py. Это будет часть, которая посылает задачи в RabbitMQ.
import pika
def publish_message(message: str):
"""Функция для отправки сообщения в очередь"""
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем очередь (создаётся, если ещё нет)
channel.queue_declare(queue='task_queue', durable=True)
# Отправка сообщения
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # Делает сообщение постоянным
)
)
print(f"[x] Отправлено: {message}")
connection.close()
Теперь добавим API в producer/app.py, чтобы пользователь мог отправлять задачи через HTTP-запросы:
from fastapi import FastAPI
from producer import publish_message
app = FastAPI()
@app.post("/send-task/")
async def send_task(task: str):
publish_message(task)
return {"status": "Task sent", "task": task}
Запускаем сервер FastAPI:
uvicorn producer.app:app --reload
Попробуйте отправить запрос на /send-task/ через Postman или Curl:
curl -X POST "http://127.0.0.1:8000/send-task/" -H "Content-Type: application/json" -d '{"task": "Hello, RabbitMQ!"}'
На этом этапе наш сервер уже умеет отправлять задачи в RabbitMQ. 🔥
Консьюмер: обработка сообщений
Переходим к созданию консьюмера в consumer/worker.py. Его задача — забрать сообщение из очереди и обработать.
import pika
import time
def callback(ch, method, properties, body):
"""Функция обработки задачи"""
print(f"[x] Получено сообщение: {body.decode()}")
time.sleep(2) # Сымитируем длительную обработку
print("[x] Задача выполнена")
ch.basic_ack(delivery_tag=method.delivery_tag) # Подтверждение обработки
def start_worker():
"""Запуск воркера"""
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print("[*] Ожидание сообщений. Для выхода нажмите CTRL+C")
channel.basic_qos(prefetch_count=1) # Обработка только 1 задачи одновременно
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
if __name__ == "__main__":
start_worker()
Запускаем воркер:
python consumer/worker.py
Асинхронность и масштабируемость
Теперь мы настроим асинхронность. Добавим несколько воркеров, чтобы обрабатывать задачи параллельно. Просто откройте несколько терминалов и запустите worker.py в каждом из них. RabbitMQ сам распределит задачи между воркерами благодаря настройке basic_qos.
Тестирование системы
- Отправьте несколько задач через
/send-task/— каждое сообщение будет поставлено в очередь. - Следите за логами в консоль воркера. Если запущено несколько воркеров, вы заметите, как задачи распределяются между ними. Это так называемый Round-Robin.
🌟 Реальное применение
Где можно использовать такую систему:
- Обработка изображений или видео.
- Фоновые расчёты, например, отправка email или SMS.
- Построение системы уведомлений.
- И многое другое: в микросервисах этот подход просто незаменим.
Особенности управления
Несколько советов для управления системой:
- Используйте постоянные очереди (
durable=True), чтобы данные не терялись при сбоях. - Настройте механизм подтверждения обработки сообщений с помощью
basic_ack. - Для мониторинга RabbitMQ используйте встроенную панель управления (http://localhost:15672, если вы уже настраивали её ранее).
На этом этапе наша асинхронная система готова. Дальше можно её тестировать, оптимизировать производительность и масштабировать. Ну а если у вас есть идеи для усовершенствования этой системы — дерзайте, миру нужны новые программисты! 🐍✨
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ