JavaRush /Курсы /Модуль 4: FastAPI /Пример настройки параллельной системы с высокой нагрузкой...

Пример настройки параллельной системы с высокой нагрузкой

Модуль 4: FastAPI
14 уровень , 9 лекция
Открыта

Постановка задачи

Представим, что мы разрабатываем приложение, где пользователи загружают изображения, а наша система обрабатывает их (например, добавляет водяные знаки, уменьшает размер, преобразует формат). При этом количество пользователей растет, и система должна масштабироваться пропорционально нагрузке.

Основные требования:

  1. Обработка изображений должна быть асинхронной.
  2. Использование нескольких очередей, чтобы разные типы задач (например, малые и большие изображения) не блокировали друг друга.
  3. Поддержка высокой нагрузки с сотнями задач в секунду.
  4. Мониторинг и управление системой.

Подготовка окружения

Для работы нам понадобятся:

  • RabbitMQ для работы с очередями.
  • Celery для обработки задач.

Если вы пропустили предыдущие лекции, то вот быстрый способ развернуть RabbitMQ с помощью Docker:


docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

И не забудьте установить необходимые библиотеки:


pip install celery[redis] pillow

Настройка проекта

Мы будем использовать следующий шаблон проекта:


/parallel_image_processor
    ├── app.py
    ├── celery_app.py
    ├── tasks.py
    ├── settings.py
    ├── utils.py
    └── requirements.txt

Настройка Celery и RabbitMQ.

Создадим файл settings.py для хранения конфигураций:


# settings.py

# Настройки RabbitMQ
BROKER_URL = 'pyamqp://guest:guest@localhost//'

# Настройки Celery
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'

Теперь подключим Celery. Создадим файл celery_app.py:


# celery_app.py

from celery import Celery
from settings import BROKER_URL, CELERY_RESULT_BACKEND

# Создаём экземпляр Celery приложения
celery_app = Celery('parallel_image_processor', broker=BROKER_URL, backend=CELERY_RESULT_BACKEND)

# Загружаем настройки
celery_app.conf.update(
    task_routes={
        'tasks.process_small_image': {'queue': 'small_images'},
        'tasks.process_large_image': {'queue': 'large_images'},
    }
)

Задачи

Добавим файл tasks.py и определим задачи:


# tasks.py

import time
from celery_app import celery_app
from utils import resize_image, add_watermark

@celery_app.task
def process_small_image(image_path):
    """Обрабатывает маленькое изображение."""
    print(f"Processing small image: {image_path}")
    resize_image(image_path, size=(200, 200))  # Уменьшаем изображение
    time.sleep(2)  # Эмуляция долгой загрузки
    print(f"Small image processed: {image_path}")

@celery_app.task
def process_large_image(image_path):
    """Обрабатывает большое изображение."""
    print(f"Processing large image: {image_path}")
    add_watermark(image_path, watermark="© Your Brand")  # Добавляем водяной знак
    time.sleep(5)  # Эмуляция долгой загрузки
    print(f"Large image processed: {image_path}")

Функции resize_image и add_watermark находятся в utils.py.


Настройка мониторинга

Flower — это веб-интерфейс для мониторинга Celery. Установим его:


pip install flower

Запустим Flower для отслеживания задач:


celery -A celery_app flower

Теперь Flower доступен по адресу: http://localhost:5555. Здесь мы можем увидеть активные задачи, статистику воркеров и другие метрики.


Запуск системы

Воркеры должны обрабатывать задачи из разных очередей:


celery -A celery_app worker --loglevel=info --queues=small_images
celery -A celery_app worker --loglevel=info --queues=large_images

Публикация задач.

Добавим в app.py простой пример:


# app.py

from tasks import process_small_image, process_large_image

# Публикуем задачи в очереди
process_small_image.delay("path/to/small_image.jpg")
process_large_image.delay("path/to/large_image.jpg")

print("Tasks published!")

Запустите этот файл, чтобы задачи начали обрабатываться.


Анализ производительности

После запуска системы можно провести нагрузочное тестирование, например, с помощью инструментов вроде Locust или Apache JMeter. Отследите, как увеличивается время выполнения задач при росте числа задач, и добавьте больше воркеров при необходимости.


Оптимизация

Если заметили узкие места, вот несколько возможных решений:

  • Увеличьте prefetch_limit для воркеров, чтобы задачи распределялись равномерно.
  • Добавьте больше продюсеров/консьюмеров.
  • Настройте QoS для приоритетной обработки задач.
  • Используйте мониторинг и анализируйте метрики RabbitMQ, чтобы выявить перегруженные очереди.

Пример нагрузки

Проверьте, как система справляется с высоким трафиком. Запустите множество задач через цикл:


for i in range(100):
    process_small_image.delay(f"small_image_{i}.jpg")
    process_large_image.delay(f"large_image_{i}.jpg")

Отслеживайте, как очереди заполняются и как воркеры их обрабатывают.


Итоговый результат

Наша система справляется с сотнями задач в секунду благодаря использованию нескольких очередей, распределению нагрузки и параллельной обработке. А с помощью инструментов мониторинга мы получили возможность управлять и оптимизировать производительность в реальном времени.


🎉 Поздравляю, вы только что настроили параллельную систему с высокой нагрузкой! Теперь вы знаете, как применять Celery и RabbitMQ в реальных проектах.

Если вдруг что-то пошло не так — не беда! Как говорят опытные разработчики,
"ошибки — это способ системы сказать вам, что вы еще не всё о ней узнали". Удачи в ваших проектах!

1
Задача
Модуль 4: FastAPI, 14 уровень, 9 лекция
Недоступна
Создание нескольких очередей для разных задач
Создание нескольких очередей для разных задач
3
Опрос
Балансировка нагрузки между продюсерами и консьюмерами, 14 уровень, 9 лекция
Недоступен
Балансировка нагрузки между продюсерами и консьюмерами
Балансировка нагрузки между продюсерами и консьюмерами
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ