JavaRush /Курси /Модуль 4: FastAPI /Приклад керування затримками та таймаутами при роботі з ч...

Приклад керування затримками та таймаутами при роботі з чергами

Модуль 4: FastAPI
Рівень 15 , Лекція 6
Відкрита

Таймаути — це як будильник, який каже нам, що пора припиняти чекати. Якщо задача виконується надто довго, треба вміти її скасувати або обробити інакше. У Celery і RabbitMQ можна налаштувати таймаути для задач і потоків обробки повідомлень, щоб простіше керувати часом виконання.

Налаштування таймаутів у Celery

Celery надає вбудований механізм для встановлення таймаутів задач. Ці таймаути дозволяють обмежити час виконання задачі або очікування її результату.

Приклад налаштування таймауту для задачі:


from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

# Конфігурація Celery
app.conf.task_time_limit = 15  # Максимальний час виконання задачі (в секундах)
app.conf.task_soft_time_limit = 10  # М'який ліміт часу виконання (в секундах)

@app.task
def long_running_task():
    import time
    time.sleep(12)  # Задача, яка спить 12 секунд
    return "Done!"

Тут task_soft_time_limit — це м'який таймаут, який посилає сигнал задачі. Вона може обробити цей сигнал і коректно завершитися. А task_time_limit жорстко зупиняє задачу, якщо вона перевищує ліміт.

Що станеться при перевищенні ліміту?

Якщо задача перевищить task_soft_time_limit, вона отримає виключення SoftTimeLimitExceeded. Ми можемо обробити це виключення.


from celery.exceptions import SoftTimeLimitExceeded

@app.task
def long_running_task_with_handling():
    try:
        import time
        time.sleep(15)
    except SoftTimeLimitExceeded:
        return "Task exceeded its soft time limit!"
    return "Task completed successfully!"

Таймаути в RabbitMQ

Щоб задачі в чергах RabbitMQ не зависали нескінченно, ми можемо налаштувати TTL (Time-To-Live) для повідомлень і черг.

Приклад налаштування TTL для повідомлень:


rabbitmqadmin declare queue name=example_queue arguments='{"x-message-ttl":60000}'

У цьому прикладі повідомлення в черзі example_queue будуть видалені, якщо не будуть оброблені протягом 60 секунд.

Приклад налаштування TTL для черги:


rabbitmqadmin declare queue name=example_queue arguments='{"x-expires":60000}'

Тут сама черга буде знищена, якщо вона не використовувалася протягом 60 секунд.


Керування затримками в чергах

Коли ваша система перевантажена, іноді краще відправити задачі на виконання з затримкою, поки ресурси не звільняться. RabbitMQ і Celery це підтримують.

Celery дозволяє відправляти задачі з затримкою за допомогою параметра eta (Estimated Time of Arrival) або countdown.

Приклад використання countdown:


@app.task
def delayed_task():
    return "Task executed!"

# Запуск задачі через 30 секунд
delayed_task.apply_async(countdown=30)

Приклад використання eta:


from datetime import datetime, timedelta

future_time = datetime.utcnow() + timedelta(seconds=45)
delayed_task.apply_async(eta=future_time)

Затримки в RabbitMQ

RabbitMQ дозволяє реалізувати затримки за допомогою плагіна для відкладених повідомлень або через Dead-letter exchange (DLX).

Приклад використання DLX для відкладених повідомлень:

  1. Створіть DLX і основну чергу:

rabbitmqctl add_user user password
rabbitmqctl set_user_tags user administrator
rabbitmqctl set_permissions -p / user ".*" ".*" ".*"

rabbitmqadmin declare exchange name=dlx1 type=direct
rabbitmqadmin declare exchange name=main_exchange type=direct
rabbitmqadmin declare queue name=dlx_queue arguments='{"x-dead-letter-exchange":"main_exchange"}'
rabbitmqadmin declare queue name=main_queue
  1. Опублікуйте повідомлення в DLX з TTL:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.basic_publish(
    exchange='',
    routing_key='dlx_queue',
    body='Delayed Message',
    properties=pika.BasicProperties(expiration='5000')  # TTL в мілісекундах
)
connection.close()

Повідомлення будуть переміщені в основну чергу після закінчення їхнього TTL.


Використання відкладених черг

RabbitMQ підтримує плагін rabbitmq_delayed_message_exchange, який можна використати для створення відкладених черг:

  1. Встановіть плагін:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. Створіть відкладену чергу:

rabbitmqadmin declare exchange name=delayed_exchange type=x-delayed-message arguments='{"x-delayed-type":"direct"}'
rabbitmqadmin declare queue name=delayed_queue
  1. Відправте повідомлення із затримкою:

channel.basic_publish(
    exchange='delayed_exchange',
    routing_key='',
    body='Delayed Task',
    properties=pika.BasicProperties(
        headers={"x-delay": 10000}  # Затримка в мілісекундах
    )
)

Приклади затримки виконання задач

Припустимо, у нас є фонові задачі, які треба виконувати раз на хвилину, а не одразу. Ми можемо використовувати beat_schedule разом з Celery.


from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='pyamqp://guest@localhost//')

app.conf.beat_schedule = {
    'run-every-1-minute': {
        'task': 'tasks.delayed_task',
        'schedule': crontab(minute='*'),
    },
}

Затримки в RabbitMQ для балансування навантаження:

Коли сервер перевантажений, можна перенаправити задачі в чергу з підвищеним TTL, поступово повертаючи їх у основну чергу.


# Опублікуємо повідомлення в чергу з TTL
channel.basic_publish(
    exchange='dlx_exchange',
    routing_key='delayed_routing_key',
    body='Deferred Task',
    properties=pika.BasicProperties(expiration='10000')  # Затримка
)

Підсумки

Тепер ви знаєте, як налаштовувати таймаути і створювати затримки для задач в асинхронних системах. Це корисно для контролю часу виконання задач і керування навантаженням на систему. Використовуйте Celery для гнучкого налаштування таймаутів у задачах і RabbitMQ для керування TTL і затримками в потоках даних. Кожна деталь має значення, коли ви створюєте масштабовану і надійну архітектуру.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ