JavaRush /Курсы /Модуль 4: FastAPI /Пример управления задержками и таймаутами при работе с оч...

Пример управления задержками и таймаутами при работе с очередями

Модуль 4: FastAPI
15 уровень , 6 лекция
Открыта

Таймауты — это как будильник, который говорит нам, что пора прекращать ждать. Если задача выполняется слишком долго, нам нужно уметь её отменить или обработать как-то иначе. В Celery и RabbitMQ можно настроить таймауты для задач и потоков обработки сообщений, чтобы было проще управлять временем выполнения.

Настройка таймаутов в Celery

Celery предоставляет встроенный механизм для установки таймаутов задач. Эти таймауты позволяют ограничить время выполнения задачи или ожидание её результата.

Пример настройки таймаута для задачи:


from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

# Конфигурация Celery
app.conf.task_time_limit = 15  # Максимальное время выполнения задачи (в секундах)
app.conf.task_soft_time_limit = 10  # Мягкий лимит времени выполнения (в секундах)

@app.task
def long_running_task():
    import time
    time.sleep(12)  # Задача, которая спит 12 секунд
    return "Done!"

Здесь task_soft_time_limit — это мягкий таймаут, который посылает сигнал задаче. Она может обработать этот сигнал и завершиться корректно. А task_time_limit уже жестко останавливает задачу, если она превышает лимит.

Что произойдет при превышении лимита?

Если задача превысит task_soft_time_limit, она получит исключение SoftTimeLimitExceeded. Мы можем обработать это исключение.


from celery.exceptions import SoftTimeLimitExceeded

@app.task
def long_running_task_with_handling():
    try:
        import time
        time.sleep(15)
    except SoftTimeLimitExceeded:
        return "Task exceeded its soft time limit!"
    return "Task completed successfully!"

Таймауты в RabbitMQ

Чтобы задачи в очередях RabbitMQ не висели бесконечно, мы можем настроить TTL (Time-To-Live) для сообщений и очередей.

Пример настройки TTL для сообщений:


rabbitmqadmin declare queue name=example_queue arguments='{"x-message-ttl":60000}'

В этом примере сообщения в очереди example_queue будут удалены, если не будут обработаны в течение 60 секунд.

Пример настройки TTL для очереди:


rabbitmqadmin declare queue name=example_queue arguments='{"x-expires":60000}'

Здесь сама очередь будет уничтожена, если она не использовалась в течение 60 секунд.


Управление задержками в очередях

Когда ваша система перегружена, иногда лучше отправить задачи на выполнение с задержкой, пока ресурсы не освободятся. RabbitMQ и Celery поддерживают это.

Celery позволяет отправлять задачи с задержкой при помощи параметра eta (Estimated Time of Arrival) или countdown.

Пример использования countdown:


@app.task
def delayed_task():
    return "Task executed!"

# Запуск задачи через 30 секунд
delayed_task.apply_async(countdown=30)

Пример использования eta:


from datetime import datetime, timedelta

future_time = datetime.utcnow() + timedelta(seconds=45)
delayed_task.apply_async(eta=future_time)

Задержки в RabbitMQ

RabbitMQ позволяет реализовать задержки с помощью плагина для отложенных сообщений или через Dead-letter exchange (DLX).

Пример использования DLX для задержанных сообщений:

  1. Создайте DLX и основную очередь:

rabbitmqctl add_user user password
rabbitmqctl set_user_tags user administrator
rabbitmqctl set_permissions -p / user ".*" ".*" ".*"

rabbitmqadmin declare exchange name=dlx1 type=direct
rabbitmqadmin declare exchange name=main_exchange type=direct
rabbitmqadmin declare queue name=dlx_queue arguments='{"x-dead-letter-exchange":"main_exchange"}'
rabbitmqadmin declare queue name=main_queue
  1. Опубликуйте сообщение в DLX с TTL:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.basic_publish(
    exchange='',
    routing_key='dlx_queue',
    body='Delayed Message',
    properties=pika.BasicProperties(expiration='5000')  # TTL в миллисекундах
)
connection.close()

Сообщения будут перемещены в основную очередь после истечения их TTL.


Использование отложенных очередей

RabbitMQ поддерживает плагин rabbitmq_delayed_message_exchange, который можно использовать для создания отложенных очередей:

  1. Установите плагин:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. Создайте отложенную очередь:

rabbitmqadmin declare exchange name=delayed_exchange type=x-delayed-message arguments='{"x-delayed-type":"direct"}'
rabbitmqadmin declare queue name=delayed_queue
  1. Отправьте сообщение с задержкой:

channel.basic_publish(
    exchange='delayed_exchange',
    routing_key='',
    body='Delayed Task',
    properties=pika.BasicProperties(
        headers={"x-delay": 10000}  # Задержка в миллисекундах
    )
)

Примеры задержки выполнения задач

Предположим, у нас есть фоновые задачи, которые нужно выполнять раз в минуту, а не сразу. Мы можем использовать beat_schedule вместе с Celery.


from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='pyamqp://guest@localhost//')

app.conf.beat_schedule = {
    'run-every-1-minute': {
        'task': 'tasks.delayed_task',
        'schedule': crontab(minute='*'),
    },
}

Задержки в RabbitMQ для баланса нагрузки:

Когда сервер перегружен, можно перенаправить задачи на очередь с повышенным TTL, постепенно возвращая их в основную очередь.


# Опубликуем сообщение в очередь с TTL
channel.basic_publish(
    exchange='dlx_exchange',
    routing_key='delayed_routing_key',
    body='Deferred Task',
    properties=pika.BasicProperties(expiration='10000')  # Задержка
)

Итоги

Теперь вы знаете, как настраивать таймауты и создавать задержки для задач в асинхронных системах. Это полезно для контроля времени выполнения задач и управления нагрузкой на систему. Используйте Celery для гибкой настройки таймаутов в задачах и RabbitMQ для управления TTL и задержками в потоках данных. Каждая деталь имеет значение, когда вы создаете масштабируемую и надежную архитектуру.

1
Задача
Модуль 4: FastAPI, 15 уровень, 6 лекция
Недоступна
Установка таймаута для задачи в Celery
Установка таймаута для задачи в Celery
1
Задача
Модуль 4: FastAPI, 15 уровень, 6 лекция
Недоступна
Настройка TTL в RabbitMQ для сообщений
Настройка TTL в RabbitMQ для сообщений
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ