JavaRush /Курси /Модуль 4: FastAPI /Створення та виконання фонових завдань через Celery

Створення та виконання фонових завдань через Celery

Модуль 4: FastAPI
Рівень 13 , Лекція 4
Відкрита

У Celery завдання — це функції, які ви анотуєте спеціальним декоратором @celery.task.

Ось приклад найпростішого завдання, яке повертає суму двох чисел:


# tasks.py
from celery import Celery

app = Celery('example_app', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

У цьому прикладі:

  • Ми імпортували Celery і створили екземпляр додатка через Celery('example_app'), вказавши брокера повідомлень (Redis).
  • Ми визначили функцію add(x, y) і зареєстрували її як завдання за допомогою декоратора @app.task.

Не забувайте структурувати проєкт! Зазвичай створюється окремий файл tasks.py, де й розміщуються функції завдань.

Хороша структура = менше головного болю.


Виконання завдань

Щоб відправити завдання на виконання (тобто поставити її в чергу Celery), використовуйте метод .delay():


# Відправляємо завдання на виконання
result = add.delay(4, 6)

# Результат виконання *можна* отримати через result.get() (якщо потрібно)
print(result.get())  # Вивід: 10

Зверніть увагу, що виклик add.delay() не блокує виконання коду. Завдання створюється і відправляється в чергу, а Celery її обробляє незалежно.


Виконання завдань з пріоритетами

Celery дозволяє виставляти пріоритет завданням, щоб важливі оброблялися швидше. Ось як це робиться:

  1. Налаштовуємо пріоритети черг у конфігурації Celery:
    
    # celeryconfig.py
    task_queues = {
        # Вказуємо ім'я черги й рівні пріоритету
        'default': {'exchange': 'default', 'routing_key': 'default', 'priority_levels': 10},
    }
    
  2. Передаємо пріоритет при виклику завдання:
    
    # Завдання з високим пріоритетом
    add.apply_async((5, 7), priority=9)
    

    Тут priority=9 вказує на високий пріоритет (чим більше число, тим вищий пріоритет).


Робота з результатами завдань

Коли наші завдання виконуються, їхні результати можна зберігати, обробляти або ігнорувати.

Celery надає механізм збереження результатів через backend (наприклад, Redis). Ось приклад:

Увімкнемо збереження результатів через Redis:


# celeryconfig.py
result_backend = 'redis://localhost:6379/1'

У цьому випадку результати виконання завдань зберігаються в Redis (у базі 1). Тепер можна отримати результат виконання:


result = add.delay(10, 20)

# Чекаємо виконання й отримуємо результат
print(result.get(timeout=10))  # Вивід: 30

Виклик result.get() блокує виконання коду до завершення завдання, що суперечить принципу асинхронності.

Використовуйте цей підхід тільки якщо це необхідно.


Практичний приклад: відправлення email

Давайте реалізуємо класичний сценарій: відправлення підтвердження про реєстрацію користувача.

Створимо завдання, яке відправляє email асинхронно.

  1. Переконаємось, що в нас встановлено SMTP-сервер. Для тестування можна використати MailHog.
  2. Створимо завдання в tasks.py:
    
    # tasks.py
    from celery import Celery
    from time import sleep
    
    app = Celery('email_app', broker='redis://localhost:6379/0')
    
    @app.task
    def send_email(email, subject, body):
        """
        Приклад емулювання відправлення листа (замість реального SMTP попрацюємо з 'print')
        """
        sleep(2)  # Імітуємо затримку відправлення
        print(f"Повідомлення відправлено: {email}, {subject}, {body}")
        return f"Email відправлено для {email}"
    

    Завдання send_email імітує відправлення листа.

  3. Викликаємо завдання з нашого додатка:
    
    # Десь у вашому коді (наприклад, у view або endpoint)
    from tasks import send_email
    
    # Відправляємо email у фоновому завданні
    result = send_email.delay(
        email="user@example.com",
        subject="Ласкаво просимо!",
        body="Дякуємо за реєстрацію на нашому сайті!"
    )
    
    # Можна також отримати результат, якщо це потрібно
    print(result.get())  # Вивід: "Email відправлено для user@example.com"
    

    Цей приклад показує, як просто зв'язати бізнес-логіку додатка з Celery, розвантаживши головний процес. Асинхронність FTW (for the win)!


Ключові помилки при роботі з Celery

  1. Неправильна конфігурація брокера повідомлень. Якщо ви забули вказати правильну адресу брокера (broker_url), завдання просто «зникнуть» у небутті. Завжди перевіряйте конфігурацію.
  2. Очікування result.get() для тривалих завдань. Мета Celery — асинхронність. Не блокуй додаток цим викликом, якщо це не вкрай необхідно.
  3. Відсутність моніторингу. Без інструментів на кшталт Flower або логів важко помітити, якщо щось пішло не так.

Ускладнюємо завдання

  1. Давайте додамо завдання для обчислення факторіалу, щоб продемонструвати обчислення:
    
    @app.task
    def calculate_factorial(n):
        result = 1
        for i in range(1, n + 1):
            result *= i
            sleep(0.1)  # Сповільнюємо для імітації довгої операції
        return result
    
  2. Використаємо її:
    
    # Запускаємо завдання
    factorial_result = calculate_factorial.delay(10)
    
    # Працюємо далі паралельно... (чекаємо, але не блокуємо!)
    print("Фонова задача запущена!")
    

Тепер ваш Celery вміє робити корисні штуки: від надсилання листів до обчислень.

У наступних лекціях ви зможете покращити цей функціонал, додавши моніторинг і планування завдань за допомогою Celery Beat.

3
Опитування
Введення в Celery, рівень 13, лекція 4
Недоступний
Введення в Celery
Введення в Celery
Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ