Постановка задачи
Представим, что мы разрабатываем приложение, где пользователи загружают изображения, а наша система обрабатывает их (например, добавляет водяные знаки, уменьшает размер, преобразует формат). При этом количество пользователей растет, и система должна масштабироваться пропорционально нагрузке.
Основные требования:
- Обработка изображений должна быть асинхронной.
- Использование нескольких очередей, чтобы разные типы задач (например, малые и большие изображения) не блокировали друг друга.
- Поддержка высокой нагрузки с сотнями задач в секунду.
- Мониторинг и управление системой.
Подготовка окружения
Для работы нам понадобятся:
- RabbitMQ для работы с очередями.
- Celery для обработки задач.
Если вы пропустили предыдущие лекции, то вот быстрый способ развернуть RabbitMQ с помощью Docker:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
И не забудьте установить необходимые библиотеки:
pip install celery[redis] pillow
Настройка проекта
Мы будем использовать следующий шаблон проекта:
/parallel_image_processor
├── app.py
├── celery_app.py
├── tasks.py
├── settings.py
├── utils.py
└── requirements.txt
Настройка Celery и RabbitMQ.
Создадим файл settings.py для хранения конфигураций:
# settings.py
# Настройки RabbitMQ
BROKER_URL = 'pyamqp://guest:guest@localhost//'
# Настройки Celery
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
Теперь подключим Celery. Создадим файл celery_app.py:
# celery_app.py
from celery import Celery
from settings import BROKER_URL, CELERY_RESULT_BACKEND
# Создаём экземпляр Celery приложения
celery_app = Celery('parallel_image_processor', broker=BROKER_URL, backend=CELERY_RESULT_BACKEND)
# Загружаем настройки
celery_app.conf.update(
task_routes={
'tasks.process_small_image': {'queue': 'small_images'},
'tasks.process_large_image': {'queue': 'large_images'},
}
)
Задачи
Добавим файл tasks.py и определим задачи:
# tasks.py
import time
from celery_app import celery_app
from utils import resize_image, add_watermark
@celery_app.task
def process_small_image(image_path):
"""Обрабатывает маленькое изображение."""
print(f"Processing small image: {image_path}")
resize_image(image_path, size=(200, 200)) # Уменьшаем изображение
time.sleep(2) # Эмуляция долгой загрузки
print(f"Small image processed: {image_path}")
@celery_app.task
def process_large_image(image_path):
"""Обрабатывает большое изображение."""
print(f"Processing large image: {image_path}")
add_watermark(image_path, watermark="© Your Brand") # Добавляем водяной знак
time.sleep(5) # Эмуляция долгой загрузки
print(f"Large image processed: {image_path}")
Функции resize_image и add_watermark находятся в utils.py.
Настройка мониторинга
Flower — это веб-интерфейс для мониторинга Celery. Установим его:
pip install flower
Запустим Flower для отслеживания задач:
celery -A celery_app flower
Теперь Flower доступен по адресу: http://localhost:5555. Здесь мы можем увидеть активные задачи, статистику воркеров и другие метрики.
Запуск системы
Воркеры должны обрабатывать задачи из разных очередей:
celery -A celery_app worker --loglevel=info --queues=small_images
celery -A celery_app worker --loglevel=info --queues=large_images
Публикация задач.
Добавим в app.py простой пример:
# app.py
from tasks import process_small_image, process_large_image
# Публикуем задачи в очереди
process_small_image.delay("path/to/small_image.jpg")
process_large_image.delay("path/to/large_image.jpg")
print("Tasks published!")
Запустите этот файл, чтобы задачи начали обрабатываться.
Анализ производительности
После запуска системы можно провести нагрузочное тестирование, например, с помощью инструментов вроде Locust или Apache JMeter. Отследите, как увеличивается время выполнения задач при росте числа задач, и добавьте больше воркеров при необходимости.
Оптимизация
Если заметили узкие места, вот несколько возможных решений:
- Увеличьте
prefetch_limitдля воркеров, чтобы задачи распределялись равномерно. - Добавьте больше продюсеров/консьюмеров.
- Настройте QoS для приоритетной обработки задач.
- Используйте мониторинг и анализируйте метрики RabbitMQ, чтобы выявить перегруженные очереди.
Пример нагрузки
Проверьте, как система справляется с высоким трафиком. Запустите множество задач через цикл:
for i in range(100):
process_small_image.delay(f"small_image_{i}.jpg")
process_large_image.delay(f"large_image_{i}.jpg")
Отслеживайте, как очереди заполняются и как воркеры их обрабатывают.
Итоговый результат
Наша система справляется с сотнями задач в секунду благодаря использованию нескольких очередей, распределению нагрузки и параллельной обработке. А с помощью инструментов мониторинга мы получили возможность управлять и оптимизировать производительность в реальном времени.
🎉 Поздравляю, вы только что настроили параллельную систему с высокой нагрузкой! Теперь вы знаете, как применять Celery и RabbitMQ в реальных проектах.
Если вдруг что-то пошло не так — не беда! Как говорят опытные разработчики,
"ошибки — это способ системы сказать вам, что вы еще не всё о ней узнали". Удачи в ваших проектах!
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ