Якщо ти дійшов до цієї лекції, то, найімовірніше, у тебе вже є базові знання про те, що таке RabbitMQ, як створювати черги, продюсерів і консьюмерів, і навіть як працювати з ними асинхронно. Сьогодні ми зберемо всі шматки пазлу разом, щоб зробити робочу асинхронну систему взаємодії з використанням FastAPI і RabbitMQ. Налаштуйся на практику: буде багато коду, фану і, звісно, трохи кроликів. 🐇
Давай розберемося, як виглядатиме наша система. Уяви, що в нас є застосунок, у якому користувач відправляє запити через API (наприклад, для завантаження й обробки файлу). FastAPI приймає цей запит і публікує задачу в чергу RabbitMQ. Консьюмер з іншого сервісу забирає цю задачу, обробляє її і зберігає результат.
Ось схема взаємодії:
Клієнт -> FastAPI (продюсер) -> RabbitMQ -> Консьюмер -> Обробка задачі -> Результат
Додамо асинхронності, щоб система могла обробляти купу задач паралельно. Ну що, готовий? Поїхали писати код! 💻
Налаштування проєкту
Для початку переконайся, що у тебе встановлено:
- Python 3.10+.
- RabbitMQ (пам'ятай, що в попередніх лекціях ми його вже налаштовували).
- Бібліотека
pikaдля роботи з RabbitMQ (якщо ще не встановив:pip install pika). - FastAPI і Uvicorn (
pip install fastapi uvicorn).
Створимо проєкт з такою структурою:
project/
├── producer/
│ ├── app.py
│ └── producer.py
├── consumer/
│ ├── worker.py
│ └── processor.py
└── requirements.txt
Продюсер: відправка повідомлень
Почнемо зі створення продюсера в producer/producer.py. Це та частина, яка відправляє задачі в RabbitMQ.
import pika
def publish_message(message: str):
"""Функція для відправки повідомлення в чергу"""
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Оголошуємо чергу (створюється, якщо ще немає)
channel.queue_declare(queue='task_queue', durable=True)
# Відправка повідомлення
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # Робить повідомлення стійким
)
)
print(f"[x] Надіслано: {message}")
connection.close()
Тепер додамо API в producer/app.py, щоб користувач міг відправляти задачі через HTTP-запити:
from fastapi import FastAPI
from producer import publish_message
app = FastAPI()
@app.post("/send-task/")
async def send_task(task: str):
publish_message(task)
return {"status": "Task sent", "task": task}
Запускаємо сервер FastAPI:
uvicorn producer.app:app --reload
Спробуй відправити запит на /send-task/ через Postman або Curl:
curl -X POST "http://127.0.0.1:8000/send-task/" -H "Content-Type: application/json" -d '{"task": "Hello, RabbitMQ!"}'
На цьому етапі наш сервер вже вміє відправляти задачі в RabbitMQ. 🔥
Консьюмер: обробка повідомлень
Переходимо до створення консьюмера в consumer/worker.py. Його завдання — забрати повідомлення з черги і обробити.
import pika
import time
def callback(ch, method, properties, body):
"""Функція обробки задачі"""
print(f"[x] Отримано повідомлення: {body.decode()}")
time.sleep(2) # Замоделюємо тривалу обробку
print("[x] Завдання виконане")
ch.basic_ack(delivery_tag=method.delivery_tag) # Підтвердження обробки
def start_worker():
"""Запуск воркера"""
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print("[*] Очікую повідомлень. Для виходу натисніть CTRL+C")
channel.basic_qos(prefetch_count=1) # Обробка лише 1 задачі одночасно
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
if __name__ == "__main__":
start_worker()
Запускаємо воркер:
python consumer/worker.py
Асинхронність і масштабованість
Тепер налаштуємо асинхронність. Додай кілька воркерів, щоб обробляти задачі паралельно. Просто відкрий кілька терміналів і запусти worker.py в кожному з них. RabbitMQ сам розподілить задачі між воркерами завдяки налаштуванню basic_qos.
Тестування системи
- Відправ кілька задач через
/send-task/— кожне повідомлення буде поставлене в чергу. - Слідкуй за логами в консолі воркера. Якщо запущено кілька воркерів, помітиш, як задачі розподіляються між ними. Це так званий Round-Robin.
🌟 Реальне застосування
Де можна використовувати таку систему:
- Обробка зображень або відео.
- Фонові розрахунки, наприклад, відправка email або SMS.
- Побудова системи сповіщень.
- І багато іншого: у мікросервісах цей підхід просто незамінний.
Особливості управління
Кілька порад для управління системою:
- Використовуй стійкі черги (
durable=True), щоб дані не губилися при збоях. - Налаштуй механізм підтвердження обробки повідомлень за допомогою
basic_ack. - Для моніторингу RabbitMQ використовуй вбудовану панель керування (http://localhost:15672, якщо ти вже її налаштовував раніше).
На цьому наша асинхронна система готова. Далі можна її тестувати, оптимізувати продуктивність і масштабувати. А якщо в тебе є ідеї, як покращити цю систему — дерзай, світу потрібні нові програмісти! 🐍✨
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ