Работа с несколькими очередями позволяет разделять задачи по категориям или приоритетам. Например:
- Очередь
low_priorityдля задач, выполнение которых может подождать. - Очередь
high_priorityдля срочных задач. - Очередь
data_processingдля трудозатратных операций, таких как обработка данных.
Использование нескольких очередей помогает лучше организовать систему и избежать ситуации, когда низкоприоритетные задачи блокируют выполнение важных задач.
Установка сцены: структура проекта
Мы создадим пример на основе FastAPI и Celery, подключенных к RabbitMQ. Задачи будут распределяться по нескольким очередям:
- Очередь
high_priorityдля срочных задач, таких как отправка уведомлений. - Очередь
low_priorityдля менее срочных задач, таких как сбор аналитики. - Очередь
data_processingдля задач обработки больших объемов данных.
Подготовка окружения
Для начала убедимся, что у нас установлены необходимые библиотеки. Выполните следующую установку:
pip install fastapi[all] celery[redis] uvicorn
Также нам понадобится RabbitMQ. Если у вас его еще нет, установите RabbitMQ через Docker:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
После запуска RabbitMQ откройте интерфейс администрирования по адресу http://localhost:15672 (логин/пароль: guest/guest).
Конфигурация Celery с несколькими очередями
В корне проекта создадим файл celery_config.py. В нем укажем настройки для Celery:
from celery import Celery
# Создаем объект Celery
celery = Celery(
"tasks",
broker="pyamqp://guest:guest@localhost:5672//",
backend="rpc://"
)
# Конфигурация очередей
celery.conf.task_routes = {
"tasks.send_notification": {"queue": "high_priority"},
"tasks.process_data": {"queue": "data_processing"},
"tasks.log_analytics": {"queue": "low_priority"},
}
celery.conf.task_default_queue = "default"
Создадим файл tasks.py, где определим три типа задач: отправка уведомлений, обработка данных и логирование аналитики:
from celery_config import celery
import time
@celery.task
def send_notification(message):
print(f"Sending notification: {message}")
time.sleep(2)
return f"Notification '{message}' sent!"
@celery.task
def process_data(data):
print(f"Processing data: {data}")
time.sleep(5)
return f"Data '{data}' processed!"
@celery.task
def log_analytics(event):
print(f"Logging analytics event: {event}")
time.sleep(1)
return f"Analytics event '{event}' logged!"
Запуск воркеров для нескольких очередей
Теперь нам нужно запустить воркеры, которые будут обрабатывать задания из разных очередей. Для этого используем разные команды:
Воркер для высокоприоритетных задач
celery -A tasks worker --loglevel=info --queues=high_priority
Воркер для обработки данных
celery -A tasks worker --loglevel=info --queues=data_processing
Воркер для задач с низким приоритетом
celery -A tasks worker --loglevel=info --queues=low_priority
Воркер для задач по умолчанию
celery -A tasks worker --loglevel=info
Интеграция с FastAPI
Теперь интегрируем FastAPI с Celery, чтобы взаимодействовать с задачами через HTTP API. Создадим файл main.py:
from fastapi import FastAPI
from tasks import send_notification, process_data, log_analytics
app = FastAPI()
@app.post("/notify/")
async def notify_user(message: str):
task = send_notification.delay(message)
return {"task_id": task.id, "status": "queued"}
@app.post("/process/")
async def process_user_data(data: str):
task = process_data.delay(data)
return {"task_id": task.id, "status": "queued"}
@app.post("/analytics/")
async def log_event(event: str):
task = log_analytics.delay(event)
return {"task_id": task.id, "status": "queued"}
Запустите FastAPI-приложение:
uvicorn main:app --reload
Теперь вы можете отправлять задачи через следующие эндпоинты:
- POST
/notify/— отправка уведомлений (очередьhigh_priority). - POST
/process/— обработка данных (очередьdata_processing). - POST
/analytics/— логирование аналитики (очередьlow_priority).
Проверка производительности
Попробуем отправить несколько задач в разные очереди и посмотрим, как они обрабатываются. Отправьте запросы с помощью curl или Postman.
Пример запросов
curl -X POST http://127.0.0.1:8000/notify/ -H "Content-Type: application/json" -d '{"message": "Hello, World!"}'
curl -X POST http://127.0.0.1:8000/process/ -H "Content-Type: application/json" -d '{"data": "Big Data"}'
curl -X POST http://127.0.0.1:8000/analytics/ -H "Content-Type: application/json" -d '{"event": "UserLoggedIn"}'
В логах воркеров вы увидите, как задачи распределяются по очередям и обрабатываются соответствующими воркерами.
Анализ производительности
Обратите внимание на следующие моменты:
- Изоляция очередей: каждая очередь обрабатывается своим воркером, что позволяет лучше распределять нагрузку.
- Приоритетность: срочные задачи обрабатываются быстрее благодаря отдельной очереди.
- Масштабируемость: в случае увеличения нагрузки вы можете запустить дополнительные воркеры для конкретной очереди.
Важные аспекты и типичные ошибки
При работе с несколькими очередями нужно учитывать:
- Отсутствие очередей по умолчанию: если задача не связана с очередью, она перейдет в очередь, заданную параметром
task_default_queue. - Блокировка задач: убедитесь, что задачи не имеют пересечений между очередями, чтобы избежать блокировок.
- Ресурсы воркеров: следите, чтобы воркеры не перегружали сервер.
Для повышения надежности настройте мониторинг RabbitMQ через RabbitMQ Management Interface или сторонние инструменты, такие как Prometheus.
Выводы
Поздравляю, теперь вы освоили разделение задач по очередям! Использование нескольких очередей не только улучшает производительность системы, но и делает её более гибкой и отказоустойчивой. Продолжайте экспериментировать с настройками и оптимизацией — впереди ещё много интересного!
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ