JavaRush /Курсы /Модуль 4: FastAPI /Создание и выполнение фоновых задач через Celery

Создание и выполнение фоновых задач через Celery

Модуль 4: FastAPI
13 уровень , 4 лекция
Открыта

В Celery задачи — это функции, которые вы аннотируете специальным декоратором @celery.task.

Вот пример самой простой задачи, которая возвращает сумму двух чисел:


# tasks.py
from celery import Celery

app = Celery('example_app', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

В этом примере:

  • Мы импортировали Celery и создали экземпляр приложения через Celery('example_app'), указав брокера сообщений (Redis).
  • Мы определили функцию add(x, y) и зарегистрировали её как задачу с помощью декоратора @app.task.

Не забывайте структурировать проект! Обычно создаётся отдельный файл tasks.py, где и размещаются функции задач.

Хорошая структура = меньше головной боли.


Выполнение задач

Чтобы направить задачу на выполнение (то есть отправить её в Celery-очередь), используйте метод .delay():


# Отправляем задачу на выполнение
result = add.delay(4, 6)

# Результат выполнения *можно* получить через result.get() (если нужно)
print(result.get())  # Вывод: 10

Обратите внимание, что вызов add.delay() не блокирует выполнение кода. Задача создаётся и отправляется в очередь, а Celery её обрабатывает независимо.


Выполнение задач с приоритетами

Celery позволяет задавать приоритет задачам, чтобы важные задачи обрабатывались раньше остальных. Вот как это делается:

  1. Настраиваем приоритеты очередей в конфигурации Celery:
    
    # celeryconfig.py
    task_queues = {
        # Указываем имя очереди и уровни приоритета
        'default': {'exchange': 'default', 'routing_key': 'default', 'priority_levels': 10},
    }
    
  2. Передаём приоритет при вызове задачи:
    
    # Задача с высоким приоритетом
    add.apply_async((5, 7), priority=9)
    

    Здесь priority=9 указывает на высокий приоритет (чем выше число, тем выше приоритет).


Работа с результатами задач

Когда наши задачи выполняются, их результаты можно сохранять, обрабатывать или игнорировать.

Celery предоставляет механизм хранения результатов через backend (например, Redis). Вот пример:

Включим функцию сохранения результатов через Redis:


# celeryconfig.py
result_backend = 'redis://localhost:6379/1'

В этом случае результаты выполнения задач сохраняются в Redis (в базе 1). Теперь можно получить результат выполнения:


result = add.delay(10, 20)

# Ожидаем выполнение и получаем результат
print(result.get(timeout=10))  # Вывод: 30

Вызов result.get() блокирует выполнение кода до завершения задачи, что противоречит принципу асинхронности.

Используйте этот подход только если это необходимо.


Практический пример: отправка email

Давайте реализуем классический сценарий: отправка подтверждения о регистрации пользователя.

Мы создадим задачу, которая отправляет email асинхронно.

  1. Убедимся, что у нас установлен SMTP-сервер. Для тестов можно использовать MailHog.
  2. Создадим задачу в tasks.py:
    
    # tasks.py
    from celery import Celery
    from time import sleep
    
    app = Celery('email_app', broker='redis://localhost:6379/0')
    
    @app.task
    def send_email(email, subject, body):
        """
        Пример эмуляции отправки письма (вместо реального SMTP поработаем с 'print')
        """
        sleep(2)  # Имитируем задержку отправки
        print(f"Сообщение отправлено: {email}, {subject}, {body}")
        return f"Email отправлен для {email}"
    

    Задача send_email эмулирует отправку письма.

  3. Вызываем задачу из нашего приложения:
    
    # Где-то в вашем коде (например, во view или endpoint)
    from tasks import send_email
    
    # Отправляем email в фоновую задачу
    result = send_email.delay(
        email="user@example.com",
        subject="Добро пожаловать!",
        body="Спасибо за регистрацию на нашем сайте!"
    )
    
    # Можно также получить результат, если это нужно
    print(result.get())  # Вывод: "Email отправлен для user@example.com"
    

    Этот пример показывает, как легко связать бизнес-логику приложения с Celery, разгружая главный процесс. Асинхронность FTW (for the win)!


Ключевые ошибки при работе с Celery

  1. Неправильная конфигурация брокера сообщений. Если вы забыли настроить правильный адрес брокера (broker_url), задачи просто "исчезнут" в небытии. Всегда проверяйте конфигурацию.
  2. Ожидание result.get() для длительных задач. Цель Celery — асинхронность. Не блокируйте приложение этим вызовом, если в этом нет крайней необходимости.
  3. Отсутствие мониторинга. Без инструментов вроде Flower или логов сложно заметить, если что-то пошло не так.

Усложняем задачи

  1. Давайте добавим задачу с расчетом факториала, чтобы продемонстрировать вычисление:
    
    @app.task
    def calculate_factorial(n):
        result = 1
        for i in range(1, n + 1):
            result *= i
            sleep(0.1)  # Замедляем для эмуляции долгой операции
        return result
    
  2. Используем её:
    
    # Запускаем задачу
    factorial_result = calculate_factorial.delay(10)
    
    # Работаем дальше параллельно... (ждем, но не блокируем!)
    print("Фоновая задача запущена!")
    

Теперь ваш Celery умеет делать полезные штуки: от отправки писем до вычислений.

В следующих лекциях вы сможете улучшить этот функционал, добавив мониторинг и планирование задач с Celery Beat.

1
Задача
Модуль 4: FastAPI, 13 уровень, 4 лекция
Недоступна
Асинхронная отправка данных
Асинхронная отправка данных
1
Задача
Модуль 4: FastAPI, 13 уровень, 4 лекция
Недоступна
Длительное вычисление и обработка результата
Длительное вычисление и обработка результата
3
Опрос
Введение в Celery, 13 уровень, 4 лекция
Недоступен
Введение в Celery
Введение в Celery
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ