JavaRush /Курсы /Модуль 4: FastAPI /Как избежать дублирования задач при повторных отправках

Как избежать дублирования задач при повторных отправках

Модуль 4: FastAPI
15 уровень , 7 лекция
Открыта

Дублирование задач происходит, когда одна и та же задача отправляется в очередь более одного раза или обрабатывается несколько раз. На первый взгляд, кажется, что это мелочь, но на практике могут возникнуть довольно серьезные проблемы:

  1. Дополнительная нагрузка на систему: дублирующиеся задачи создают лишнюю нагрузку на очереди, воркеры и базу данных.
  2. Нарушение бизнес-логики: повторное выполнение задачи может привести к некорректным изменениям данных. Например, дважды отправленный счет или двойной списание денег с карты клиента.
  3. Сложность отладки: лишние копии задач могут запутать логи и затруднить диагностику проблем.

Вот вам пример из жизни. Представьте, что вы заказали пиццу через приложение, но из-за дубликатов задач система отправила два заказа вместо одного. Одна пицца, конечно, круто, но платить за две — не так уж весело, правда?

Для борьбы с дублированием задач в асинхронных системах применяют практику обеспечения идемпотентности. Идемпотентной называется операция, которую можно выполнять сколько угодно раз, но результат будет всегда одинаковым.

Пример из жизни: нажатие кнопки лифта. Нажмите один раз, или десять раз — кабина лифта от вашей активности быстрее не приедет.

Идемпотентность в задачах достигается использованием одного или нескольких подходов, которые рассмотрим ниже.


Использование уникальных идентификаторов задач

В Celery каждая задача автоматически получает уникальный идентификатор, который можно использовать для отслеживания её статуса и предотвращения повторного выполнения. Тем не менее, в реальных проектах нередко возникает необходимость в дополнительных идентификаторах — для учёта особенностей бизнес-логики или интеграции с внешними системами.

Как же происходит генерация уникальных Task ID?


from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

# Пример задачи с использованием уникального идентификатора
@app.task(bind=True)
def process_order(self, order_id):
    unique_task_id = f"task-{order_id}"
    if not is_task_processed(unique_task_id):
        save_task_status(unique_task_id)
        print(f"Processing order {order_id}")
        # Логика обработки заказа
    else:
        print(f"Order {order_id} was already processed!")

def is_task_processed(task_id):
    # Проверяем статус задачи в Redis или другой базе
    # Возвращаем True, если задача уже была выполнена
    pass

def save_task_status(task_id):
    # Сохраняем статус выполненной задачи в Redis или другой базе
    pass

Здесь мы создаем уникальный идентификатор задачи, привязанный к ID заказа. Если система видит, что задача с таким ID уже выполнена, она просто пропускает обработку.


Использование ключей Redis для обеспечения идемпотентности

Redis — отличный инструмент для хранения временных данных, таких как идентификаторы задач. Вы можете использовать его для проверки и предотвращения повторной обработки задач.

Пример с использованием ключей Redis


import redis

redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

@app.task(bind=True)
def process_order(self, order_id):
    unique_task_id = f"task-{order_id}"
    
    if redis_client.setnx(unique_task_id, "processing"):
        redis_client.expire(unique_task_id, 3600)  # Устанавливаем тайм-аут
        print(f"Processing order {order_id}")
        # Логика обработки заказа
        redis_client.set(unique_task_id, "processed")
    else:
        print(f"Duplicate task detected for order {order_id}")

В этом коде мы используем метод setnx, чтобы установить ключ, если он не существует. Если ключ уже есть, значит, задача выполняется или уже выполнена.


Применение хэширования на основе данных задачи

Для задач, где входные данные уникальны, можно использовать хэширование для определения дубликатов. Например, хэшировать содержимое тела задачи и проверять, не выполнялась ли она ранее.

Пример создания хэша задач


import hashlib

@app.task(bind=True)
def process_order(self, order_data):
    task_hash = hashlib.md5(str(order_data).encode()).hexdigest()
    if not redis_client.setnx(task_hash, "processing"):
        print(f"Duplicate task detected for data: {order_data}")
        return
    redis_client.expire(task_hash, 3600)
    print(f"Processing order with data: {order_data}")
    # Логика обработки данных
    redis_client.set(task_hash, "processed")

Хэширование особенно полезно для задач с большим количеством параметров или данных, где простой ID не подходит.


Ограничение на повторные отправки задач

В Celery можно настроить ограничение на повторные отправки через retry и retry_policy. Это позволяет избежать бесконечного выполнения задач при ошибках.

Пример настройки retry


from celery.exceptions import MaxRetriesExceededError

@app.task(bind=True, max_retries=3)
def fetch_data(self, url):
    try:
        # Логика выполнения задачи
        print(f"Fetching data from {url}")
    except Exception as e:
        print(f"Error occurred: {e}")
        try:
            self.retry(countdown=5, exc=e)
        except MaxRetriesExceededError:
            print(f"Task failed after maximum retries: {self.request.id}")

В этом примере, если задача несколько раз сбоила, она больше не будет повторно отправляться в очередь.


Практические советы по предотвращению дублирования

  1. Используйте базу данных для отслеживания статуса задач: если Redis или ин-мемори базы не подходят, используйте стандартную базу данных для хранения выполненных задач.
  2. Настраивайте retry и таймауты: так вы сможете избежать вечного выполнения одних и тех же задач.
  3. Периодически чистите старые ключи и идентификаторы: например, в Redis может накапливаться множество ключей, которые занимают память.

Секрет успеха заключается в том, чтобы комбинировать различные подходы. Например, идентификаторы для проверки уникальности, плюс Redis для временного хранения, плюс хэширование для сложных задач. Используйте их в зависимости от требований вашего проекта.

Теперь у вас есть все инструменты, чтобы предотвратить дублирование задач, а ваши серверы и пользователи скажут вам спасибо!

1
Задача
Модуль 4: FastAPI, 15 уровень, 7 лекция
Недоступна
Уникальные идентификаторы задач в Celery
Уникальные идентификаторы задач в Celery
1
Задача
Модуль 4: FastAPI, 15 уровень, 7 лекция
Недоступна
Предотвращение дублирования с использованием Redis
Предотвращение дублирования с использованием Redis
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ