JavaRush /Курси /Модуль 4: FastAPI /Як уникнути дублювання завдань при повторних відправлення...

Як уникнути дублювання завдань при повторних відправленнях

Модуль 4: FastAPI
Рівень 15 , Лекція 7
Відкрита

Дублювання завдань відбувається, коли те саме завдання надсилається в чергу більше одного разу або обробляється кілька разів. На перший погляд здається дрібницею, але на практиці можуть виникнути доволі серйозні проблеми:

  1. Додаткове навантаження на систему: дублюючіся завдання створюють зайве навантаження на черги, воркери та базу даних.
  2. Порушення бізнес-логіки: повторне виконання завдання може призвести до некоректних змін даних. Наприклад, двічі виставлений рахунок або подвійне списання грошей з карти клієнта.
  3. Складність налагодження: зайві копії завдань можуть заплутати логи і ускладнити діагностику проблем.

Ось приклад з життя. Уявіть, що ви замовили піцу через додаток, але через дублікати завдань система відправила два замовлення замість одного. Одна піца, звісно, круто, але платити за дві — не надто весело, правда?

Для боротьби з дублюванням завдань в асинхронних системах застосовують практику забезпечення ідемпотентності. Ідемпотентною називається операція, яку можна виконувати скільки завгодно разів, але результат завжди буде однаковим.

Приклад із життя: натискання кнопки ліфта. Натисни один раз або десять — кабіна ліфта від твоєї активності швидше не приїде.

Ідемпотентність у завданнях досягається використанням одного або кількох підходів, які розглянемо нижче.


Використання унікальних ідентифікаторів завдань

В Celery кожне завдання автоматично отримує унікальний ідентифікатор, який можна використовувати для відстеження його статусу і запобігання повторному виконанню. Однак у реальних проектах часто виникає потреба в додаткових ідентифікаторах — для урахування особливостей бізнес-логіки або інтеграції з зовнішніми системами.

Як же відбувається генерація унікальних Task ID?


from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

# Приклад завдання з використанням унікального ідентифікатора
@app.task(bind=True)
def process_order(self, order_id):
    unique_task_id = f"task-{order_id}"
    if not is_task_processed(unique_task_id):
        save_task_status(unique_task_id)
        print(f"Processing order {order_id}")
        # Логіка обробки замовлення
    else:
        print(f"Order {order_id} was already processed!")

def is_task_processed(task_id):
    # Перевіряємо статус завдання в Redis або іншій базі
    # Повертаємо True, якщо завдання вже було виконане
    pass

def save_task_status(task_id):
    # Зберігаємо статус виконаного завдання в Redis або іншій базі
    pass

Тут ми створюємо унікальний ідентифікатор завдання, прив'язаний до ID замовлення. Якщо система бачить, що завдання з таким ID вже виконане, вона просто пропускає обробку.


Використання ключів Redis для забезпечення ідемпотентності

Redis — відмінний інструмент для зберігання тимчасових даних, таких як ідентифікатори завдань. Ти можеш використовувати його для перевірки і запобігання повторної обробки завдань.

Приклад із використанням ключів Redis


import redis

redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

@app.task(bind=True)
def process_order(self, order_id):
    unique_task_id = f"task-{order_id}"
    
    if redis_client.setnx(unique_task_id, "processing"):
        redis_client.expire(unique_task_id, 3600)  # Встановлюємо таймаут
        print(f"Processing order {order_id}")
        # Логіка обробки замовлення
        redis_client.set(unique_task_id, "processed")
    else:
        print(f"Duplicate task detected for order {order_id}")

У цьому коді ми використовуємо метод setnx, щоб встановити ключ, якщо він не існує. Якщо ключ уже є, значить завдання виконується або вже виконане.


Застосування хешування на основі даних завдання

Для завдань, де вхідні дані унікальні, можна використовувати хешування для визначення дублікатів. Наприклад, хешувати вміст тіла завдання і перевіряти, чи не виконувалось воно раніше.

Приклад створення хеша завдання


import hashlib

@app.task(bind=True)
def process_order(self, order_data):
    task_hash = hashlib.md5(str(order_data).encode()).hexdigest()
    if not redis_client.setnx(task_hash, "processing"):
        print(f"Duplicate task detected for data: {order_data}")
        return
    redis_client.expire(task_hash, 3600)
    print(f"Processing order with data: {order_data}")
    # Логіка обробки даних
    redis_client.set(task_hash, "processed")

Хешування особливо корисне для завдань з великою кількістю параметрів або даних, де простий ID не підходить.


Обмеження на повторні відправлення завдань

В Celery можна налаштувати обмеження на повторні відправлення через retry і retry_policy. Це дозволяє уникнути нескінченного виконання завдань при помилках.

Приклад налаштування retry


from celery.exceptions import MaxRetriesExceededError

@app.task(bind=True, max_retries=3)
def fetch_data(self, url):
    try:
        # Логіка виконання завдання
        print(f"Fetching data from {url}")
    except Exception as e:
        print(f"Error occurred: {e}")
        try:
            self.retry(countdown=5, exc=e)
        except MaxRetriesExceededError:
            print(f"Task failed after maximum retries: {self.request.id}")

У цьому прикладі, якщо завдання кілька разів зазнає помилки, воно більше не буде повторно відправлятись в чергу.


Практичні поради щодо запобігання дублюванню

  1. Використовуйте базу даних для відстеження статусу завдань: якщо Redis або in-memory бази не підходять, використовуйте стандартну базу даних для зберігання виконаних завдань.
  2. Налаштовуйте retry і таймаути: так ви зможете уникнути вічного виконання тих самих завдань.
  3. Періодично чистіть старі ключі та ідентифікатори: наприклад, в Redis може накопичуватися багато ключів, які займають пам'ять.

Секрет успіху в тому, щоб комбінувати різні підходи. Наприклад, ідентифікатори для перевірки унікальності, плюс Redis для тимчасового зберігання, плюс хешування для складних завдань. Використовуйте їх залежно від вимог вашого проєкту.

Тепер у тебе є всі інструменти, щоб запобігти дублюванню завдань — і твої сервери та користувачі скажуть тобі дякую!

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ