Таймауты — это как будильник, который говорит нам, что пора прекращать ждать. Если задача выполняется слишком долго, нам нужно уметь её отменить или обработать как-то иначе. В Celery и RabbitMQ можно настроить таймауты для задач и потоков обработки сообщений, чтобы было проще управлять временем выполнения.
Настройка таймаутов в Celery
Celery предоставляет встроенный механизм для установки таймаутов задач. Эти таймауты позволяют ограничить время выполнения задачи или ожидание её результата.
Пример настройки таймаута для задачи:
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
# Конфигурация Celery
app.conf.task_time_limit = 15 # Максимальное время выполнения задачи (в секундах)
app.conf.task_soft_time_limit = 10 # Мягкий лимит времени выполнения (в секундах)
@app.task
def long_running_task():
import time
time.sleep(12) # Задача, которая спит 12 секунд
return "Done!"
Здесь task_soft_time_limit — это мягкий таймаут, который посылает сигнал задаче. Она может обработать этот сигнал и завершиться корректно. А task_time_limit уже жестко останавливает задачу, если она превышает лимит.
Что произойдет при превышении лимита?
Если задача превысит task_soft_time_limit, она получит исключение SoftTimeLimitExceeded. Мы можем обработать это исключение.
from celery.exceptions import SoftTimeLimitExceeded
@app.task
def long_running_task_with_handling():
try:
import time
time.sleep(15)
except SoftTimeLimitExceeded:
return "Task exceeded its soft time limit!"
return "Task completed successfully!"
Таймауты в RabbitMQ
Чтобы задачи в очередях RabbitMQ не висели бесконечно, мы можем настроить TTL (Time-To-Live) для сообщений и очередей.
Пример настройки TTL для сообщений:
rabbitmqadmin declare queue name=example_queue arguments='{"x-message-ttl":60000}'
В этом примере сообщения в очереди example_queue будут удалены, если не будут обработаны в течение 60 секунд.
Пример настройки TTL для очереди:
rabbitmqadmin declare queue name=example_queue arguments='{"x-expires":60000}'
Здесь сама очередь будет уничтожена, если она не использовалась в течение 60 секунд.
Управление задержками в очередях
Когда ваша система перегружена, иногда лучше отправить задачи на выполнение с задержкой, пока ресурсы не освободятся. RabbitMQ и Celery поддерживают это.
Celery позволяет отправлять задачи с задержкой при помощи параметра eta (Estimated Time of Arrival) или countdown.
Пример использования countdown:
@app.task
def delayed_task():
return "Task executed!"
# Запуск задачи через 30 секунд
delayed_task.apply_async(countdown=30)
Пример использования eta:
from datetime import datetime, timedelta
future_time = datetime.utcnow() + timedelta(seconds=45)
delayed_task.apply_async(eta=future_time)
Задержки в RabbitMQ
RabbitMQ позволяет реализовать задержки с помощью плагина для отложенных сообщений или через Dead-letter exchange (DLX).
Пример использования DLX для задержанных сообщений:
- Создайте DLX и основную очередь:
rabbitmqctl add_user user password
rabbitmqctl set_user_tags user administrator
rabbitmqctl set_permissions -p / user ".*" ".*" ".*"
rabbitmqadmin declare exchange name=dlx1 type=direct
rabbitmqadmin declare exchange name=main_exchange type=direct
rabbitmqadmin declare queue name=dlx_queue arguments='{"x-dead-letter-exchange":"main_exchange"}'
rabbitmqadmin declare queue name=main_queue
- Опубликуйте сообщение в DLX с TTL:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(
exchange='',
routing_key='dlx_queue',
body='Delayed Message',
properties=pika.BasicProperties(expiration='5000') # TTL в миллисекундах
)
connection.close()
Сообщения будут перемещены в основную очередь после истечения их TTL.
Использование отложенных очередей
RabbitMQ поддерживает плагин rabbitmq_delayed_message_exchange, который можно использовать для создания отложенных очередей:
- Установите плагин:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- Создайте отложенную очередь:
rabbitmqadmin declare exchange name=delayed_exchange type=x-delayed-message arguments='{"x-delayed-type":"direct"}'
rabbitmqadmin declare queue name=delayed_queue
- Отправьте сообщение с задержкой:
channel.basic_publish(
exchange='delayed_exchange',
routing_key='',
body='Delayed Task',
properties=pika.BasicProperties(
headers={"x-delay": 10000} # Задержка в миллисекундах
)
)
Примеры задержки выполнения задач
Предположим, у нас есть фоновые задачи, которые нужно выполнять раз в минуту, а не сразу. Мы можем использовать beat_schedule вместе с Celery.
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.beat_schedule = {
'run-every-1-minute': {
'task': 'tasks.delayed_task',
'schedule': crontab(minute='*'),
},
}
Задержки в RabbitMQ для баланса нагрузки:
Когда сервер перегружен, можно перенаправить задачи на очередь с повышенным TTL, постепенно возвращая их в основную очередь.
# Опубликуем сообщение в очередь с TTL
channel.basic_publish(
exchange='dlx_exchange',
routing_key='delayed_routing_key',
body='Deferred Task',
properties=pika.BasicProperties(expiration='10000') # Задержка
)
Итоги
Теперь вы знаете, как настраивать таймауты и создавать задержки для задач в асинхронных системах. Это полезно для контроля времени выполнения задач и управления нагрузкой на систему. Используйте Celery для гибкой настройки таймаутов в задачах и RabbitMQ для управления TTL и задержками в потоках данных. Каждая деталь имеет значение, когда вы создаете масштабируемую и надежную архитектуру.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ