JavaRush /Курсы /Модуль 4: FastAPI /Пример параллельной обработки задач с использованием неск...

Пример параллельной обработки задач с использованием нескольких очередей

Модуль 4: FastAPI
14 уровень , 5 лекция
Открыта

Работа с несколькими очередями позволяет разделять задачи по категориям или приоритетам. Например:

  • Очередь low_priority для задач, выполнение которых может подождать.
  • Очередь high_priority для срочных задач.
  • Очередь data_processing для трудозатратных операций, таких как обработка данных.

Использование нескольких очередей помогает лучше организовать систему и избежать ситуации, когда низкоприоритетные задачи блокируют выполнение важных задач.


Установка сцены: структура проекта

Мы создадим пример на основе FastAPI и Celery, подключенных к RabbitMQ. Задачи будут распределяться по нескольким очередям:

  1. Очередь high_priority для срочных задач, таких как отправка уведомлений.
  2. Очередь low_priority для менее срочных задач, таких как сбор аналитики.
  3. Очередь data_processing для задач обработки больших объемов данных.

Подготовка окружения

Для начала убедимся, что у нас установлены необходимые библиотеки. Выполните следующую установку:


pip install fastapi[all] celery[redis] uvicorn

Также нам понадобится RabbitMQ. Если у вас его еще нет, установите RabbitMQ через Docker:


docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

После запуска RabbitMQ откройте интерфейс администрирования по адресу http://localhost:15672 (логин/пароль: guest/guest).


Конфигурация Celery с несколькими очередями

В корне проекта создадим файл celery_config.py. В нем укажем настройки для Celery:


from celery import Celery

# Создаем объект Celery
celery = Celery(
    "tasks",
    broker="pyamqp://guest:guest@localhost:5672//",
    backend="rpc://"
)

# Конфигурация очередей
celery.conf.task_routes = {
    "tasks.send_notification": {"queue": "high_priority"},
    "tasks.process_data": {"queue": "data_processing"},
    "tasks.log_analytics": {"queue": "low_priority"},
}

celery.conf.task_default_queue = "default"

Создадим файл tasks.py, где определим три типа задач: отправка уведомлений, обработка данных и логирование аналитики:


from celery_config import celery
import time

@celery.task
def send_notification(message):
    print(f"Sending notification: {message}")
    time.sleep(2)
    return f"Notification '{message}' sent!"

@celery.task
def process_data(data):
    print(f"Processing data: {data}")
    time.sleep(5)
    return f"Data '{data}' processed!"

@celery.task
def log_analytics(event):
    print(f"Logging analytics event: {event}")
    time.sleep(1)
    return f"Analytics event '{event}' logged!"

Запуск воркеров для нескольких очередей

Теперь нам нужно запустить воркеры, которые будут обрабатывать задания из разных очередей. Для этого используем разные команды:

Воркер для высокоприоритетных задач


celery -A tasks worker --loglevel=info --queues=high_priority

Воркер для обработки данных


celery -A tasks worker --loglevel=info --queues=data_processing

Воркер для задач с низким приоритетом


celery -A tasks worker --loglevel=info --queues=low_priority

Воркер для задач по умолчанию


celery -A tasks worker --loglevel=info

Интеграция с FastAPI

Теперь интегрируем FastAPI с Celery, чтобы взаимодействовать с задачами через HTTP API. Создадим файл main.py:


from fastapi import FastAPI
from tasks import send_notification, process_data, log_analytics

app = FastAPI()

@app.post("/notify/")
async def notify_user(message: str):
    task = send_notification.delay(message)
    return {"task_id": task.id, "status": "queued"}

@app.post("/process/")
async def process_user_data(data: str):
    task = process_data.delay(data)
    return {"task_id": task.id, "status": "queued"}

@app.post("/analytics/")
async def log_event(event: str):
    task = log_analytics.delay(event)
    return {"task_id": task.id, "status": "queued"}

Запустите FastAPI-приложение:


uvicorn main:app --reload

Теперь вы можете отправлять задачи через следующие эндпоинты:

  • POST /notify/ — отправка уведомлений (очередь high_priority).
  • POST /process/ — обработка данных (очередь data_processing).
  • POST /analytics/ — логирование аналитики (очередь low_priority).

Проверка производительности

Попробуем отправить несколько задач в разные очереди и посмотрим, как они обрабатываются. Отправьте запросы с помощью curl или Postman.

Пример запросов


curl -X POST http://127.0.0.1:8000/notify/ -H "Content-Type: application/json" -d '{"message": "Hello, World!"}'
curl -X POST http://127.0.0.1:8000/process/ -H "Content-Type: application/json" -d '{"data": "Big Data"}'
curl -X POST http://127.0.0.1:8000/analytics/ -H "Content-Type: application/json" -d '{"event": "UserLoggedIn"}'

В логах воркеров вы увидите, как задачи распределяются по очередям и обрабатываются соответствующими воркерами.


Анализ производительности

Обратите внимание на следующие моменты:

  1. Изоляция очередей: каждая очередь обрабатывается своим воркером, что позволяет лучше распределять нагрузку.
  2. Приоритетность: срочные задачи обрабатываются быстрее благодаря отдельной очереди.
  3. Масштабируемость: в случае увеличения нагрузки вы можете запустить дополнительные воркеры для конкретной очереди.

Важные аспекты и типичные ошибки

При работе с несколькими очередями нужно учитывать:

  • Отсутствие очередей по умолчанию: если задача не связана с очередью, она перейдет в очередь, заданную параметром task_default_queue.
  • Блокировка задач: убедитесь, что задачи не имеют пересечений между очередями, чтобы избежать блокировок.
  • Ресурсы воркеров: следите, чтобы воркеры не перегружали сервер.

Для повышения надежности настройте мониторинг RabbitMQ через RabbitMQ Management Interface или сторонние инструменты, такие как Prometheus.


Выводы

Поздравляю, теперь вы освоили разделение задач по очередям! Использование нескольких очередей не только улучшает производительность системы, но и делает её более гибкой и отказоустойчивой. Продолжайте экспериментировать с настройками и оптимизацией — впереди ещё много интересного!

1
Задача
Модуль 4: FastAPI, 14 уровень, 5 лекция
Недоступна
Создание и интеграция многоприоритетных очередей
Создание и интеграция многоприоритетных очередей
1
Задача
Модуль 4: FastAPI, 14 уровень, 5 лекция
Недоступна
Запуск воркеров для обработки нескольких очередей
Запуск воркеров для обработки нескольких очередей
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ