JavaRush /Курсы /Модуль 4: FastAPI /Интеграция Celery в крупные системы для обработки массовы...

Интеграция Celery в крупные системы для обработки массовых данных

Модуль 4: FastAPI
13 уровень , 8 лекция
Открыта

Когда вы разрабатываете маленькое приложение, вы можете обойтись одним брокером сообщений и одним воркером Celery. Но что делать, если ваши пользователи начинают "взрывать" ваши сервера запросами, а на фоне нужно обрабатывать десятки тысяч задач? Или, скажем, вы хотите обрабатывать данные для аналитики в реальном времени? В таких случаях вам необходимо масштабировать систему.

Celery — это супергерой, у которого есть "плащ" масштабируемости. Давайте узнаем, как этот плащ использовать.

Когда одного воркера недостаточно

Прежде всего стоит понять, что воркеры Celery можно запускать на нескольких серверах, объединяя их в единую систему. Это достигается благодаря тому, что Celery воркеры слушают брокера сообщений (например, RabbitMQ или Redis), а сам брокер может быть доступен из любой точки сети.

Примерная архитектура масштабируемой системы выглядит так:


+-----------------+       +--------------------+
| FastAPI/Django  | --->  |   RabbitMQ/Redis   |
+-----------------+       +--------------------+
                                |
                                V
              +-------------------+-------------------+
              |  Celery Worker #1 |  Celery Worker #2 |
              +-------------------+-------------------+

Увеличение числа воркеров

Запустить нескольких воркеров можно буквально командой:


celery -A your_project worker --loglevel=info --concurrency=4

Ключ --concurrency задаёт максимальное количество задач, которые каждый воркер может выполнять одновременно. Если у вас есть 4 ядра на сервере, этот параметр стоит выставить равным 4. Но если у вас есть несколько серверов, вы можете запускать воркеры на каждом из них и распределять нагрузку.

Теперь ваши задачи могут обрабатываться параллельно!

Использование очередей с разными приоритетами

Celery позволяет создавать задачи с различными приоритетами и распределять их по разным очередям. Например, низкоприоритетные задачи (такие как отправка отчётов) могут обрабатываться позже, чем высокоприоритетные задачи (например, подтверждение платежей).

Для этого в конфигурации Celery можно указать несколько очередей:


from kombu import Queue

CELERY_QUEUES = (
    Queue('high_priority', routing_key='high.#'),
    Queue('default', routing_key='default.#'),
    Queue('low_priority', routing_key='low.#'),
)

CELERY_ROUTES = {
    'tasks.high_priority_task': {'queue': 'high_priority'},
    'tasks.default_task': {'queue': 'default'},
    'tasks.low_priority_task': {'queue': 'low_priority'},
}

Теперь высокоприоритетные задачи попадут в high_priority, а ваш низкоприоритетный ежедневный отчёт — в low_priority.

Настройка Prefetch Limit

Чтобы воркер не захватывал слишком много задач и не загружался до отказа, вы можете настроить, сколько задач он "предскажет". Это называется prefetch limit. Добавьте это в конфигурацию Celery:


CELERYD_PREFETCH_MULTIPLIER = 1

Если вы сделаете 1, воркер будет забирать только одну задачу за раз. Это может немного сократить производительность, но сильно уменьшит риск "захлебывания" воркера.


Интеграция Celery в существующие крупные системы

RabbitMQ и Redis — брокеры сообщений, которые поддерживаются Celery. Однако для обработки массовых данных RabbitMQ подойдёт лучше, так как он поддерживает более сложные сценарии маршрутизации и управления очередями.

Для крупных систем важна надёжность доставки сообщений. RabbitMQ позволяет настроить устойчивые очереди (durable queues), чтобы сообщения не терялись даже при сбоях.

Пример настройки устойчивой очереди в Celery:


BROKER_URL = 'pyamqp://guest@localhost//'

CELERY_TASK_QUEUES = [
    Queue('high_priority', durable=True),
]

Воркеры на мультисерверах

Теперь представьте, что ваш бизнес растёт, и вы обрабатываете миллионы задач каждый день. Один сервер уже не справляется. Здесь приходит на помощь деплой воркеров Celery на несколько серверов. Каждый воркер будет подключён к одному брокеру сообщений, но задачи будут распределяться между ними автоматически.

Например:

Server 1: Worker 1 with 4 threads
Server 2: Worker 2 with 8 threads
Server 3: Worker 3 with 4 threads

Такой подход увеличивает устойчивость системы и позволяет масштабировать её, добавляя новые воркеры по мере необходимости.

Использование Celery Flower для мониторинга

Когда система становиться большой, "глазами везде не успеешь". Для мониторинга задач используется инструмент Celery Flower. Flower показывает, какие задачи выполняются, их статус, а также позволяет отменять неудавшиеся задачи.

Установка Flower:


pip install flower

Запуск:


celery -A your_project flower --port=5555

Теперь вы можете открыть localhost:5555 и увидеть всю магию под капотом.


Примеры успешной интеграции и использования

Пример: массовое добавление транзакций в базу данных. Допустим, у вас есть система для обработки финансовых транзакций. В течение каждого часа поступают тысячи записей, которые нужно проверять и записывать в базу.

Код задачи может выглядеть так:


from celery import shared_task

@shared_task
def process_transaction(transaction_id):
    # Обработка транзакции
    transaction = get_transaction_by_id(transaction_id)
    validate_transaction(transaction)
    save_to_database(transaction)

Очередь с транзакциями может обрабатываться несколькими воркерами на нескольких серверах одновременно, что существенно повысит производительность.

ETL-пайплайны

Другой пример — обработка больших данных. Допустим, вы собираете данные из нескольких источников (CSV-файлы, API, базы данных) и выполняете ETL (Extract, Transform, Load). В этом случае каждая стадия обработки данных может быть представлена в виде отдельной задачи Celery.

Пример:


@shared_task
def extract_data(source):
    # Извлечение данных из источника
    return data

@shared_task
def transform_data(data):
    # Преобразование данных
    return transformed_data

@shared_task
def load_data(transformed_data):
    # Загрузка данных в хранилище
    save_to_warehouse(transformed_data)

Эти задачи могут выполняться в асинхронном режиме, что позволит обработать огромные объёмы данных.


Примеры реальных компаний

Многие крупные компании используют Celery для фоновых задач. Например:

  • Instagram: использует Celery для обработки миллиардов фотографий и видео.
  • Mozilla: обрабатывает аналитические данные с помощью Celery.
  • Reddit: обрабатывает массовые пуш-уведомления пользователям.

Эти примеры показывают, как мощный инструмент, такой как Celery, может справляться с огромными объёмами работы при правильной настройке.


Советы по оптимизации

  1. Разделяйте ваши очереди: используйте разные очереди для задач с разным приоритетом.
  2. Следите за производительностью системы: используйте мониторинг (например, Celery Flower).
  3. Используйте отказоустойчивые брокеры: RabbitMQ для больших проектов подходит лучше всего.
  4. Масштабируйте: добавляйте новые воркеры по мере роста нагрузки.
  5. Логируйте ошибки: никогда не игнорируйте ошибки выполнения задач.

На этом этапе вы можете уверенно интегрировать Celery в проекты практически любого масштаба и быть уверенным, что ваши фоновые задачи обрабатываются с максимальной эффективностью.

1
Задача
Модуль 4: FastAPI, 13 уровень, 8 лекция
Недоступна
Обработка нескольких задач параллельно
Обработка нескольких задач параллельно
1
Задача
Модуль 4: FastAPI, 13 уровень, 8 лекция
Недоступна
Разделение задач по очередям
Разделение задач по очередям
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ