JavaRush /Курси /Модуль 4: FastAPI /Приклад паралельної обробки задач із використанням кілько...

Приклад паралельної обробки задач із використанням кількох черг

Модуль 4: FastAPI
Рівень 14 , Лекція 5
Відкрита

Робота з кількома чергами дозволяє розділяти задачі за категоріями або пріоритетами. Наприклад:

  • Черга low_priority для задач, виконання яких може почекати.
  • Черга high_priority для термінових задач.
  • Черга data_processing для ресурсомістких операцій, таких як обробка даних.

Використання кількох черг допомагає краще організувати систему й уникнути ситуації, коли низькоприоритетні задачі блокують виконання важливих.


Постановка задачі: структура проєкту

Зробимо приклад на базі FastAPI і Celery, підключених до RabbitMQ. Задачі будуть розподілятися по кількох чергах:

  1. Черга high_priority для термінових задач, таких як відправка повідомлень.
  2. Черга low_priority для менш термінових задач, таких як збір аналітики.
  3. Черга data_processing для задач обробки великих обсягів даних.

Підготовка оточення

Для початку переконайся, що в тебе встановлені потрібні бібліотеки. Виконай таке встановлення:


pip install fastapi[all] celery[redis] uvicorn

Також знадобиться RabbitMQ. Якщо в тебе його ще нема, запусти RabbitMQ через Docker:


docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Після запуску RabbitMQ відкрий інтерфейс адміністрування за адресою http://localhost:15672 (логін/пароль: guest/guest).


Конфігурація Celery з кількома чергами

У корені проєкту створимо файл celery_config.py. У ньому вкажемо налаштування для Celery:


from celery import Celery

# Створюємо об'єкт Celery
celery = Celery(
    "tasks",
    broker="pyamqp://guest:guest@localhost:5672//",
    backend="rpc://"
)

# Конфігурація черг
celery.conf.task_routes = {
    "tasks.send_notification": {"queue": "high_priority"},
    "tasks.process_data": {"queue": "data_processing"},
    "tasks.log_analytics": {"queue": "low_priority"},
}

celery.conf.task_default_queue = "default"

Створимо файл tasks.py, де визначимо три типи задач: відправлення повідомлень, обробка даних і логування аналітики:


from celery_config import celery
import time

@celery.task
def send_notification(message):
    print(f"Sending notification: {message}")
    time.sleep(2)
    return f"Notification '{message}' sent!"

@celery.task
def process_data(data):
    print(f"Processing data: {data}")
    time.sleep(5)
    return f"Data '{data}' processed!"

@celery.task
def log_analytics(event):
    print(f"Logging analytics event: {event}")
    time.sleep(1)
    return f"Analytics event '{event}' logged!"

Запуск воркерів для кількох черг

Тепер треба запустити воркери, які оброблятимуть завдання з різних черг. Для цього використовуємо різні команди:

Воркер для задач високого пріоритету


celery -A tasks worker --loglevel=info --queues=high_priority

Воркер для обробки даних


celery -A tasks worker --loglevel=info --queues=data_processing

Воркер для задач з низьким пріоритетом


celery -A tasks worker --loglevel=info --queues=low_priority

Воркер для задач за замовчуванням


celery -A tasks worker --loglevel=info

Інтеграція з FastAPI

Тепер інтегруємо FastAPI з Celery, щоб взаємодіяти з задачами через HTTP API. Створимо файл main.py:


from fastapi import FastAPI
from tasks import send_notification, process_data, log_analytics

app = FastAPI()

@app.post("/notify/")
async def notify_user(message: str):
    task = send_notification.delay(message)
    return {"task_id": task.id, "status": "queued"}

@app.post("/process/")
async def process_user_data(data: str):
    task = process_data.delay(data)
    return {"task_id": task.id, "status": "queued"}

@app.post("/analytics/")
async def log_event(event: str):
    task = log_analytics.delay(event)
    return {"task_id": task.id, "status": "queued"}

Запусти FastAPI-додаток:


uvicorn main:app --reload

Тепер можеш відправляти задачі через такі ендпоінти:

  • POST /notify/ — відправлення повідомлень (черга high_priority).
  • POST /process/ — обробка даних (черга data_processing).
  • POST /analytics/ — логування аналітики (черга low_priority).

Перевірка продуктивності

Спробуємо відправити кілька задач у різні черги й подивимось, як їх обробляють. Відправляй запити через curl або Postman.

Приклад запитів


curl -X POST http://127.0.0.1:8000/notify/ -H "Content-Type: application/json" -d '{"message": "Hello, World!"}'
curl -X POST http://127.0.0.1:8000/process/ -H "Content-Type: application/json" -d '{"data": "Big Data"}'
curl -X POST http://127.0.0.1:8000/analytics/ -H "Content-Type: application/json" -d '{"event": "UserLoggedIn"}'

У логах воркерів побачиш, як задачі розподіляються по чергах і обробляються відповідними воркерами.


Аналіз продуктивності

Зверни увагу на такі моменти:

  1. Ізоляція черг: кожну чергу обробляє свій воркер, що дозволяє краще розподіляти навантаження.
  2. Пріоритетність: термінові задачі обробляються швидше завдяки окремій черзі.
  3. Масштабованість: у разі збільшення навантаження можна запустити додаткові воркери для конкретної черги.

Важливі аспекти та типові помилки

При роботі з кількома чергами слід враховувати:

  • Відсутність черг за замовчуванням: якщо задача не прив'язана до черги, вона потрапить у чергу, вказану в параметрі task_default_queue.
  • Блокування задач: переконайся, що задачі не перетинаються між чергами, щоб уникнути блокувань.
  • Ресурси воркерів: слідкуй, щоб воркери не перевантажували сервер.

Для підвищення надійності налаштуй моніторинг RabbitMQ через RabbitMQ Management Interface або сторонні інструменти, такі як Prometheus.


Висновки

Вітаю, тепер ти засвоїв розподіл задач по чергах! Використання кількох черг не тільки покращує продуктивність системи, але й робить її гнучкішою та відмовостійкішою. Продовжуй експериментувати з налаштуваннями та оптимізацією — попереду ще багато цікавого!

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ