Таймаути — це як будильник, який каже нам, що пора припиняти чекати. Якщо задача виконується надто довго, треба вміти її скасувати або обробити інакше. У Celery і RabbitMQ можна налаштувати таймаути для задач і потоків обробки повідомлень, щоб простіше керувати часом виконання.
Налаштування таймаутів у Celery
Celery надає вбудований механізм для встановлення таймаутів задач. Ці таймаути дозволяють обмежити час виконання задачі або очікування її результату.
Приклад налаштування таймауту для задачі:
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
# Конфігурація Celery
app.conf.task_time_limit = 15 # Максимальний час виконання задачі (в секундах)
app.conf.task_soft_time_limit = 10 # М'який ліміт часу виконання (в секундах)
@app.task
def long_running_task():
import time
time.sleep(12) # Задача, яка спить 12 секунд
return "Done!"
Тут task_soft_time_limit — це м'який таймаут, який посилає сигнал задачі. Вона може обробити цей сигнал і коректно завершитися. А task_time_limit жорстко зупиняє задачу, якщо вона перевищує ліміт.
Що станеться при перевищенні ліміту?
Якщо задача перевищить task_soft_time_limit, вона отримає виключення SoftTimeLimitExceeded. Ми можемо обробити це виключення.
from celery.exceptions import SoftTimeLimitExceeded
@app.task
def long_running_task_with_handling():
try:
import time
time.sleep(15)
except SoftTimeLimitExceeded:
return "Task exceeded its soft time limit!"
return "Task completed successfully!"
Таймаути в RabbitMQ
Щоб задачі в чергах RabbitMQ не зависали нескінченно, ми можемо налаштувати TTL (Time-To-Live) для повідомлень і черг.
Приклад налаштування TTL для повідомлень:
rabbitmqadmin declare queue name=example_queue arguments='{"x-message-ttl":60000}'
У цьому прикладі повідомлення в черзі example_queue будуть видалені, якщо не будуть оброблені протягом 60 секунд.
Приклад налаштування TTL для черги:
rabbitmqadmin declare queue name=example_queue arguments='{"x-expires":60000}'
Тут сама черга буде знищена, якщо вона не використовувалася протягом 60 секунд.
Керування затримками в чергах
Коли ваша система перевантажена, іноді краще відправити задачі на виконання з затримкою, поки ресурси не звільняться. RabbitMQ і Celery це підтримують.
Celery дозволяє відправляти задачі з затримкою за допомогою параметра eta (Estimated Time of Arrival) або countdown.
Приклад використання countdown:
@app.task
def delayed_task():
return "Task executed!"
# Запуск задачі через 30 секунд
delayed_task.apply_async(countdown=30)
Приклад використання eta:
from datetime import datetime, timedelta
future_time = datetime.utcnow() + timedelta(seconds=45)
delayed_task.apply_async(eta=future_time)
Затримки в RabbitMQ
RabbitMQ дозволяє реалізувати затримки за допомогою плагіна для відкладених повідомлень або через Dead-letter exchange (DLX).
Приклад використання DLX для відкладених повідомлень:
- Створіть DLX і основну чергу:
rabbitmqctl add_user user password
rabbitmqctl set_user_tags user administrator
rabbitmqctl set_permissions -p / user ".*" ".*" ".*"
rabbitmqadmin declare exchange name=dlx1 type=direct
rabbitmqadmin declare exchange name=main_exchange type=direct
rabbitmqadmin declare queue name=dlx_queue arguments='{"x-dead-letter-exchange":"main_exchange"}'
rabbitmqadmin declare queue name=main_queue
- Опублікуйте повідомлення в DLX з TTL:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(
exchange='',
routing_key='dlx_queue',
body='Delayed Message',
properties=pika.BasicProperties(expiration='5000') # TTL в мілісекундах
)
connection.close()
Повідомлення будуть переміщені в основну чергу після закінчення їхнього TTL.
Використання відкладених черг
RabbitMQ підтримує плагін rabbitmq_delayed_message_exchange, який можна використати для створення відкладених черг:
- Встановіть плагін:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- Створіть відкладену чергу:
rabbitmqadmin declare exchange name=delayed_exchange type=x-delayed-message arguments='{"x-delayed-type":"direct"}'
rabbitmqadmin declare queue name=delayed_queue
- Відправте повідомлення із затримкою:
channel.basic_publish(
exchange='delayed_exchange',
routing_key='',
body='Delayed Task',
properties=pika.BasicProperties(
headers={"x-delay": 10000} # Затримка в мілісекундах
)
)
Приклади затримки виконання задач
Припустимо, у нас є фонові задачі, які треба виконувати раз на хвилину, а не одразу. Ми можемо використовувати beat_schedule разом з Celery.
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.beat_schedule = {
'run-every-1-minute': {
'task': 'tasks.delayed_task',
'schedule': crontab(minute='*'),
},
}
Затримки в RabbitMQ для балансування навантаження:
Коли сервер перевантажений, можна перенаправити задачі в чергу з підвищеним TTL, поступово повертаючи їх у основну чергу.
# Опублікуємо повідомлення в чергу з TTL
channel.basic_publish(
exchange='dlx_exchange',
routing_key='delayed_routing_key',
body='Deferred Task',
properties=pika.BasicProperties(expiration='10000') # Затримка
)
Підсумки
Тепер ви знаєте, як налаштовувати таймаути і створювати затримки для задач в асинхронних системах. Це корисно для контролю часу виконання задач і керування навантаженням на систему. Використовуйте Celery для гнучкого налаштування таймаутів у задачах і RabbitMQ для керування TTL і затримками в потоках даних. Кожна деталь має значення, коли ви створюєте масштабовану і надійну архітектуру.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ