JavaRush /Курси /Модуль 4: FastAPI /Оптимізація роботи черг за допомогою Celery і RabbitMQ

Оптимізація роботи черг за допомогою Celery і RabbitMQ

Модуль 4: FastAPI
Рівень 14 , Лекція 4
Відкрита

Коли ви тільки впроваджуєте обробку черг з Celery і RabbitMQ, здається, що все працює надзвичайно гладко. Але з ростом навантаження, збільшенням кількості задач або користувачів система може почати буксувати. Черги стають довшими, обробка сповільнюється, а воркери починають втомлюватись (майже як програмісти в п'ятницю ввечері). Щоб уникнути такої ситуації, треба навчитися оптимізувати роботу черг і зробити нашу систему максимально продуктивною.

Використання prefetch_limit

Коли воркер забирає задачі з черги RabbitMQ, він намагається взяти якомога більше задач, щоб обробити їх "пачкою". Це пришвидшує роботу, але може призвести до проблем, якщо задач занадто багато або вони вимагають різного часу на обробку: воркер може "захопити" надто багато задач і не впоратися з їх виконанням. Щоб уникнути цього, Celery дозволяє налаштувати параметр prefetch_limit, який обмежує число задач, які воркер може взяти за один раз.

Наведемо приклад налаштування.

У файлі конфігурації Celery (celeryconfig.py) додайте наступний рядок:


worker_prefetch_multiplier = 1  # Кількість задач, які воркер бере одночасно

У цьому прикладі ми вказуємо, що воркер буде брати задачі по одній. Це корисно, якщо задачі мають різний час обробки і ви хочете мінімізувати затримки.


Налаштування якості обслуговування (QoS)

RabbitMQ підтримує налаштування якості обслуговування черг через параметри Celery. Основний параметр тут — acks_late. Коли цей прапорець увімкнений, підтвердження виконання задачі відправляється RabbitMQ тільки після її завершення. Інакше задача вважається виконаною одразу після отримання, що може призвести до втрати задач, якщо воркер впаде.

Приклад налаштування у файлі конфігурації:


task_acks_late = True  # Підтвердження виконання задачі відправляється тільки після її завершення
task_reject_on_worker_lost = True  # Якщо воркер втрачає задачу, вона повертається в чергу

Ці параметри допоможуть уникнути «залипання» задач, коли воркер падає або перевантажується під час виконання.


Оптимізація розподілу задач

Розподіл задач між воркерами можна покращити за рахунок надання задачам різних пріоритетів і використання кількох черг.

Ви можете налаштувати пріоритети задач через параметр priority. Наприклад:


from celery import Celery

app = Celery('tasks', backend='rpc://', broker='pyamqp://guest@localhost//')

@app.task(priority=10)  # Чим більше число, тим нижчий пріоритет
def low_priority_task():
    return "Це задача з низьким пріоритетом"

@app.task(priority=0)  # Вищий пріоритет
def high_priority_task():
    return "Це задача з високим пріоритетом"

RabbitMQ обробляє задачі за пріоритетом у порядку їх значень: задачі з меншими значеннями обробляються першими.

Складні системи часто розподіляють задачі по різних чергах залежно від їх типу. Наприклад, повільні задачі можна відправляти в одну чергу, а швидкі — в іншу.


CELERY_TASK_ROUTES = {
    'tasks.fast_task': {'queue': 'fast_queue'},
    'tasks.slow_task': {'queue': 'slow_queue'},
}

Відповідно, запуск воркерів для різних черг:


celery -A proj worker --queues=fast_queue
celery -A proj worker --queues=slow_queue

Використання Rate Limit для обмеження швидкості обробки задач

Якщо ваша система обробляє дані з зовнішнього API, який має обмеження по швидкості запитів, ви можете налаштувати ліміти для задач.

Наприклад, в Celery ви можете обмежувати кількість задач, що виконуються за одиницю часу:


@app.task(rate_limit='10/m')  # Не більше 10 задач за хвилину
def api_task():
    return "Задача, яка викликає зовнішній API"

Автоматичне масштабування воркерів

Коли навантаження зростає, ви можете динамічно збільшувати кількість воркерів. Celery підтримує використання autoscaler для автоматичного масштабування:


celery -A proj worker --autoscale=10,3

Де 10 — максимальна кількість воркерів, а 3 — мінімальна.


Очищення старих повідомлень з черг

RabbitMQ зберігає повідомлення в чергах доти, поки вони не будуть опрацьовані. Якщо задач занадто багато, це може призвести до збільшення часу обробки. Щоб уникнути цього, можна використовувати конфігурацію TTL (time-to-live) для повідомлень.

Наведемо приклад. При створенні черги в RabbitMQ можна вказати TTL:


from kombu import Exchange, Queue

task_queues = (
    Queue('example_queue', Exchange('example'), routing_key='example', queue_arguments={'x-message-ttl': 60000}),
)

У цьому прикладі задачі в черзі будуть видалятися, якщо воркер не встигне обробити їх за 60 секунд.


Приклади оптимізації через конфігурацію Celery і RabbitMQ

Розглянемо практичний приклад. У нас є система, що обробляє замовлення з інтернет-магазину. Кожне замовлення проходить через три задачі:

  1. Перевірка наявності товару на складі.
  2. Проведення оплати.
  3. Відправка повідомлення клієнту.

Налаштування черг:

  • Черга A: перевірка наявності товару (швидка, низьке навантаження).
  • Черга B: оплата (середнє навантаження, потребує взаємодії з API).
  • Черга C: повідомлення (високе навантаження, низький пріоритет).

Конфігурація в celeryconfig.py:


CELERY_TASK_ROUTES = {
    'tasks.check_stock': {'queue': 'queue_a'},
    'tasks.process_payment': {'queue': 'queue_b'},
    'tasks.send_notification': {'queue': 'queue_c'},
}

worker_prefetch_multiplier = 1
task_acks_late = True

Запуск воркерів для кожної черги:


celery -A shop worker --queues=queue_a --concurrency=4
celery -A shop worker --queues=queue_b --concurrency=2
celery -A shop worker --queues=queue_c --concurrency=1
  • Воркери для queue_a працюють паралельно (4 потоки).
  • Для queue_b використовується менше воркерів, щоб обмежити запити до API.
  • Для queue_c вистачає одного воркера, бо задачі мають низький пріоритет.

Підсумки налаштувань

Тепер система здатна одночасно обробляти замовлення, не "захлинаючись" задачами з великим навантаженням. Усі задачі обробляються в порядку їх пріоритетів, і система залишається стабільною завдяки конфігурації QoS, prefetch limits і TTL повідомлень.

Для моніторингу продуктивності можна скористатися Flower або RabbitMQ Management Plugin, щоб стежити за навантаженням і станом ваших черг.


celery -A shop flower

На цьому етапі ви навчилися не просто налаштовувати черги, а й оптимізувати їх для більш ефективної обробки задач. У реальних проектах це критично важливо для забезпечення стабільної роботи під навантаженням.

3
Опитування
Основи паралельної обробки завдань у системах, рівень 14, лекція 4
Недоступний
Основи паралельної обробки завдань у системах
Основи паралельної обробки завдань у системах
Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ