JavaRush /Курси /Модуль 4: FastAPI /Інтеграція Celery в великі системи для обробки масових да...

Інтеграція Celery в великі системи для обробки масових даних

Модуль 4: FastAPI
Рівень 13 , Лекція 8
Відкрита

Коли ти розробляєш маленький застосунок, можна обійтися одним брокером повідомлень і одним воркером Celery. Але що робити, якщо твої користувачі починають "валити" твої сервери запитами, а на фоні треба обробляти десятки тисяч задач? Або, скажімо, ти хочеш обробляти дані для аналітики в реальному часі? У таких випадках треба масштабувати систему.

Celery — це супергерой, у якого є "плащ" масштабування. Давай розберемося, як цей плащ використовувати.

Коли одного воркера недостатньо

Насамперед варто зрозуміти, що воркери Celery можна запускати на кількох серверах, об'єднуючи їх в єдину систему. Це досягається завдяки тому, що Celery-воркери слухають брокера повідомлень (наприклад, RabbitMQ або Redis), а сам брокер може бути доступний з будь-якої точки мережі.

Приблизна архітектура масштабованої системи виглядає так:


+-----------------+       +--------------------+
| FastAPI/Django  | --->  |   RabbitMQ/Redis   |
+-----------------+       +--------------------+
                                |
                                V
              +-------------------+-------------------+
              |  Celery Worker #1 |  Celery Worker #2 |
              +-------------------+-------------------+

Збільшення кількості воркерів

Запустити кілька воркерів можна буквально командою:


celery -A your_project worker --loglevel=info --concurrency=4

Ключ --concurrency задає максимальну кількість задач, які кожен воркер може виконувати одночасно. Якщо на сервері 4 ядра, цей параметр варто виставити рівним 4. Але якщо у тебе є кілька серверів, ти можеш запускати воркери на кожному з них і розподіляти навантаження.

Тепер твої задачі можуть оброблятися паралельно!

Використання черг з різними пріоритетами

Celery дозволяє створювати задачі з різними пріоритетами і розподіляти їх по різних чергах. Наприклад, задачі з низьким пріоритетом (такі як відправка звітів) можуть оброблятися пізніше, ніж задачі з високим пріоритетом (наприклад, підтвердження платежів).

Для цього в конфігурації Celery можна вказати кілька черг:


from kombu import Queue

CELERY_QUEUES = (
    Queue('high_priority', routing_key='high.#'),
    Queue('default', routing_key='default.#'),
    Queue('low_priority', routing_key='low.#'),
)

CELERY_ROUTES = {
    'tasks.high_priority_task': {'queue': 'high_priority'},
    'tasks.default_task': {'queue': 'default'},
    'tasks.low_priority_task': {'queue': 'low_priority'},
}

Тепер задачі з високим пріоритетом потраплятимуть у high_priority, а твій низькоприоритетний щоденний звіт — у low_priority.

Налаштування Prefetch Limit

Щоб воркер не хапав занадто багато задач і не вантажився до відмови, ти можеш налаштувати, скільки задач він "передбачає". Це називається prefetch limit. Додай це в конфігурацію Celery:


CELERYD_PREFETCH_MULTIPLIER = 1

Якщо зробити 1, воркер буде брати тільки одну задачу за раз. Це може трохи знизити продуктивність, але сильно зменшить ризик "закашлювання" воркера.


Інтеграція Celery в існуючі великі системи

RabbitMQ і Redis — брокери повідомлень, які підтримує Celery. Однак для обробки масових даних RabbitMQ підійде краще, бо він підтримує складніші сценарії маршрутизації і керування чергами.

Для великих систем важлива надійність доставки повідомлень. RabbitMQ дозволяє налаштувати стійкі черги (durable queues), щоб повідомлення не губилися навіть при збоях.

Приклад налаштування стійкої черги в Celery:


BROKER_URL = 'pyamqp://guest@localhost//'

CELERY_TASK_QUEUES = [
    Queue('high_priority', durable=True),
]

Воркери на мультисерверах

Тепер уяви, що твій бізнес росте, і ти обробляєш мільйони задач щодня. Один сервер вже не справляється. Тут на допомогу приходить деплой воркерів Celery на кілька серверів. Кожен воркер буде підключений до одного брокера повідомлень, але задачі розподілятимуться між ними автоматично.

Наприклад:

Server 1: Worker 1 with 4 threads
Server 2: Worker 2 with 8 threads
Server 3: Worker 3 with 4 threads

Такий підхід підвищує стійкість системи і дає змогу масштабувати її, додавши нові воркери в міру зростання навантаження.

Використання Celery Flower для моніторингу

Коли система стає великою, "очима всюди не встигнеш". Для моніторингу задач використовується інструмент Celery Flower. Flower показує, які задачі виконуються, їхній статус, а також дозволяє відміняти неуспішні задачі.

Встановлення Flower:


pip install flower

Запуск:


celery -A your_project flower --port=5555

Тепер можеш відкрити localhost:5555 і побачити всю магію під капотом.


Приклади успішної інтеграції та використання

Приклад: масове додавання транзакцій у базу даних. Припустимо, у тебе є система для обробки фінансових транзакцій. Щогодини надходять тисячі записів, які треба перевіряти і записувати в базу.

Код задачі може виглядати так:


from celery import shared_task

@shared_task
def process_transaction(transaction_id):
    # Обробка транзакції
    transaction = get_transaction_by_id(transaction_id)
    validate_transaction(transaction)
    save_to_database(transaction)

Черга з транзакціями може оброблятися кількома воркерами на кількох серверах одночасно, що суттєво підвищить продуктивність.

ETL-пайплайни

Інший приклад — обробка великих даних. Припустимо, ти збираєш дані з кількох джерел (CSV-файли, API, бази даних) і робиш ETL (Extract, Transform, Load). У такому випадку кожен етап обробки даних може бути представлений як окрема задача Celery.

Приклад:


@shared_task
def extract_data(source):
    # Витяг даних із джерела
    return data

@shared_task
def transform_data(data):
    # Перетворення даних
    return transformed_data

@shared_task
def load_data(transformed_data):
    # Завантаження даних у сховище
    save_to_warehouse(transformed_data)

Ці задачі можуть виконуватися асинхронно, що дозволить обробити величезні обсяги даних.


Приклади реальних компаній

Багато великих компаній використовують Celery для фонових задач. Наприклад:

  • Instagram: використовує Celery для обробки мільярдів фото і відео.
  • Mozilla: обробляє аналітичні дані за допомогою Celery.
  • Reddit: обробляє масові push-повідомлення користувачам.

Ці приклади показують, як потужний інструмент, такий як Celery, може справлятися з величезними обсягами роботи за правильної конфігурації.


Поради щодо оптимізації

  1. Розділяй свої черги: використовуйте різні черги для задач з різним пріоритетом.
  2. Слідкуй за продуктивністю системи: використовуй моніторинг (наприклад, Celery Flower).
  3. Використовуй відмовостійкі брокери: RabbitMQ для великих проєктів підходить найкраще.
  4. Масштабуй: додавай нові воркери в міру зростання навантаження.
  5. Логуй помилки: ніколи не ігноруй помилки виконання задач.

На цьому етапі ти можеш впевнено інтегрувати Celery у проєкти практично будь-якого масштабу і бути впевненим, що твої фоні задачі обробляються з максимальною ефективністю.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ