JavaRush /Курсы /Модуль 4: FastAPI /Использование RabbitMQ для обработки фоновых задач

Использование RabbitMQ для обработки фоновых задач

Модуль 4: FastAPI
12 уровень , 8 лекция
Открыта

Сегодняшние веб-приложения нередко сталкиваются с задачами, выполнение которых может занять значительное время. Например:

  • Отправка подтверждения по электронной почте при регистрации.
  • Генерация отчетов.
  • Обработка больших данных.
  • Интеграция с третьими сервисами, где задержки неизбежны (например, платежными шлюзами).

Фоновые задачи — это как нанять команду помощников, которая забирает "долгие" задачи из ваших плеч и выполняет их в стороннем сервисе (в данном случае RabbitMQ). Пользовательская часть вашего приложения при этом остается быстрой и отзывчивой.


Настройка RabbitMQ для фоновых задач

Для начала создадим структуру приложения. В этом примере мы реализуем сценарий отправки электронных писем. Это классическая задача, которую часто выносят на фон, чтобы она не "тормозила" интерфейс пользователя.

Для работы с фоновой обработкой задач через RabbitMQ мы будем использовать библиотеку Celery. Это мощный инструмент, который легко интегрируется с RabbitMQ.


pip install celery
pip install "celery[rabbitmq]"

Настройка Celery с RabbitMQ

Создадим новый файл celery_app.py для настройки Celery в нашем FastAPI-приложении.


from celery import Celery

# Настраиваем приложение Celery и подключаем RabbitMQ в качестве брокера
celery_app = Celery(
    "tasks",  # имя приложения Celery
    broker="amqp://guest:guest@localhost:5672//",  # URL RabbitMQ
    backend="rpc://"  # Хранилище результатов выполнения задач
)

# Пример конфигурации Celery
celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],  # Поддерживаем только JSON формат
    result_serializer="json",
    timezone="UTC",  # Ваш часовой пояс
    enable_utc=True,
)

В этом коде мы подключили Celery к локальному RabbitMQ с использованием стандартных настроек (гость/гость). Также настроили сериализацию сообщений для работы в формате JSON.


Реализация фоновой задачи

Теперь мы создадим задачу для отправки электронной почты.

Создадим файл tasks.py, где будем реализовывать наши задачи.


from time import sleep
from celery_app import celery_app

@celery_app.task
def send_email(to_email: str, subject: str, message: str):
    # Симуляция длительного процесса
    print(f"Начинаем отправку письма на {to_email}...")
    sleep(5)  # Задержка для демонстрации
    print(f"Письмо отправлено на {to_email} с темой '{subject}'!")
    return f"Email отправлен на {to_email}"

Эта задача будет симулировать отправку электронной почты, используя задержку в 5 секунд (sleep). В реальном проекте вместо этой задержки вы можете использовать, например, библиотеку smtplib или сторонний сервис для отправки писем.


Интеграция фоновых задач в FastAPI

Пришло время связать это с нашим приложением FastAPI. Добавим возможность отправить письмо через API, поручив фоновой задаче обработку.

Основной файл приложения


from fastapi import FastAPI, BackgroundTasks
from tasks import send_email

app = FastAPI()

@app.post("/send-email/")
async def api_send_email(to_email: str, subject: str, message: str):
    # Запускаем фоновую задачу
    send_email.apply_async(args=[to_email, subject, message])
    return {"message": "Задача поставлена в очередь"}

Задача send_email добавляется в очередь RabbitMQ с помощью метода apply_async. Это заставляет Celery запланировать выполнение задачи, в то время как FastAPI немедленно отправляет ответ клиенту.


Запуск и тестирование задачи

Убедитесь, что RabbitMQ запущен. Если вы используете Docker, можно запустить его так:


docker run -d --hostname rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Celery-воркеры — это те "помощники", которые выполняют задачи, поступающие в очередь. Запустим их:


celery -A celery_app worker --loglevel=info

Для тестирования используем любой HTTP-клиент (например, Postman или cURL):


curl -X POST "http://127.0.0.1:8000/send-email/" \
-H "Content-Type: application/json" \
-d '{"to_email":"test@example.com", "subject":"RabbitMQ", "message":"Этот курс великолепен!"}'

После выполнения запроса вы увидите в консоли Celery-воркера что-то вроде:


[2023-11-01 12:00:34,123: INFO/MainProcess] Received task: send_email[1234abcd-5678]
Начинаем отправку письма на test@example.com...
Письмо отправлено на test@example.com с темой 'RabbitMQ'!

Управление производительностью

  • Распределение нагрузки

Если задачи выполняются слишком медленно, вы можете масштабировать воркеры:


celery -A celery_app worker --loglevel=info --concurrency=4

Этот пример запускает 4 воркера для параллельной обработки задач. В реальных приложениях значение concurrency можно увеличить, в зависимости от количества ядер процессора и доступных ресурсов.

  • Ограничение повторных попыток

Celery по умолчанию будет пытаться повторно выполнить задачу, если она завершится с ошибкой. Вы можете настроить это:


@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, to_email: str, subject: str, message: str):
    try:
        print(f"Отправляем письмо на {to_email}...")
        # Логика отправки
        raise Exception("Ошибка во время отправки!")  # Симуляция сбоя
    except Exception as exc:
        print(f"Сбой: {exc}")
        raise self.retry(exc=exc)

Этот код задает максимум 3 попытки с задержкой 60 секунд между ними.


Применение на практике: где это используется?

RabbitMQ и Celery активно применяются для фоновой обработки задач в реальных проектах. Некоторые примеры:

  • E-commerce: генерация счетов, отправка уведомлений клиентам.
  • Социальные сети: обработка медиа (например, сжатие изображений или видео).
  • Сайты бронирований: отправка подтверждений резерваций.
  • Финансовый сектор: обработка транзакций и заявок от пользователей.

RabbitMQ помогает минимизировать задержки для пользователя в подобных сценариях и делает приложение более масштабируемым.


Это только начало вашего пути в мир фоновых задач! Мы еще вернемся к Celery в будущих лекциях, а пока можно насладиться новым уровнем производительности ваших приложений.

1
Задача
Модуль 4: FastAPI, 12 уровень, 8 лекция
Недоступна
Получение сообщения из очереди RabbitMQ
Получение сообщения из очереди RabbitMQ
1
Задача
Модуль 4: FastAPI, 12 уровень, 8 лекция
Недоступна
Интеграция RabbitMQ с Celery для обработки фоновых задач
Интеграция RabbitMQ с Celery для обработки фоновых задач
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ