Сегодняшние веб-приложения нередко сталкиваются с задачами, выполнение которых может занять значительное время. Например:
- Отправка подтверждения по электронной почте при регистрации.
- Генерация отчетов.
- Обработка больших данных.
- Интеграция с третьими сервисами, где задержки неизбежны (например, платежными шлюзами).
Фоновые задачи — это как нанять команду помощников, которая забирает "долгие" задачи из ваших плеч и выполняет их в стороннем сервисе (в данном случае RabbitMQ). Пользовательская часть вашего приложения при этом остается быстрой и отзывчивой.
Настройка RabbitMQ для фоновых задач
Для начала создадим структуру приложения. В этом примере мы реализуем сценарий отправки электронных писем. Это классическая задача, которую часто выносят на фон, чтобы она не "тормозила" интерфейс пользователя.
Для работы с фоновой обработкой задач через RabbitMQ мы будем использовать библиотеку Celery. Это мощный инструмент, который легко интегрируется с RabbitMQ.
pip install celery
pip install "celery[rabbitmq]"
Настройка Celery с RabbitMQ
Создадим новый файл celery_app.py для настройки Celery в нашем FastAPI-приложении.
from celery import Celery
# Настраиваем приложение Celery и подключаем RabbitMQ в качестве брокера
celery_app = Celery(
"tasks", # имя приложения Celery
broker="amqp://guest:guest@localhost:5672//", # URL RabbitMQ
backend="rpc://" # Хранилище результатов выполнения задач
)
# Пример конфигурации Celery
celery_app.conf.update(
task_serializer="json",
accept_content=["json"], # Поддерживаем только JSON формат
result_serializer="json",
timezone="UTC", # Ваш часовой пояс
enable_utc=True,
)
В этом коде мы подключили Celery к локальному RabbitMQ с использованием стандартных настроек (гость/гость). Также настроили сериализацию сообщений для работы в формате JSON.
Реализация фоновой задачи
Теперь мы создадим задачу для отправки электронной почты.
Создадим файл tasks.py, где будем реализовывать наши задачи.
from time import sleep
from celery_app import celery_app
@celery_app.task
def send_email(to_email: str, subject: str, message: str):
# Симуляция длительного процесса
print(f"Начинаем отправку письма на {to_email}...")
sleep(5) # Задержка для демонстрации
print(f"Письмо отправлено на {to_email} с темой '{subject}'!")
return f"Email отправлен на {to_email}"
Эта задача будет симулировать отправку электронной почты, используя задержку в 5 секунд (sleep). В реальном проекте вместо этой задержки вы можете использовать, например, библиотеку smtplib или сторонний сервис для отправки писем.
Интеграция фоновых задач в FastAPI
Пришло время связать это с нашим приложением FastAPI. Добавим возможность отправить письмо через API, поручив фоновой задаче обработку.
Основной файл приложения
from fastapi import FastAPI, BackgroundTasks
from tasks import send_email
app = FastAPI()
@app.post("/send-email/")
async def api_send_email(to_email: str, subject: str, message: str):
# Запускаем фоновую задачу
send_email.apply_async(args=[to_email, subject, message])
return {"message": "Задача поставлена в очередь"}
Задача send_email добавляется в очередь RabbitMQ с помощью метода apply_async. Это заставляет Celery запланировать выполнение задачи, в то время как FastAPI немедленно отправляет ответ клиенту.
Запуск и тестирование задачи
Убедитесь, что RabbitMQ запущен. Если вы используете Docker, можно запустить его так:
docker run -d --hostname rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Celery-воркеры — это те "помощники", которые выполняют задачи, поступающие в очередь. Запустим их:
celery -A celery_app worker --loglevel=info
Для тестирования используем любой HTTP-клиент (например, Postman или cURL):
curl -X POST "http://127.0.0.1:8000/send-email/" \
-H "Content-Type: application/json" \
-d '{"to_email":"test@example.com", "subject":"RabbitMQ", "message":"Этот курс великолепен!"}'
После выполнения запроса вы увидите в консоли Celery-воркера что-то вроде:
[2023-11-01 12:00:34,123: INFO/MainProcess] Received task: send_email[1234abcd-5678]
Начинаем отправку письма на test@example.com...
Письмо отправлено на test@example.com с темой 'RabbitMQ'!
Управление производительностью
- Распределение нагрузки
Если задачи выполняются слишком медленно, вы можете масштабировать воркеры:
celery -A celery_app worker --loglevel=info --concurrency=4
Этот пример запускает 4 воркера для параллельной обработки задач. В реальных приложениях значение concurrency можно увеличить, в зависимости от количества ядер процессора и доступных ресурсов.
- Ограничение повторных попыток
Celery по умолчанию будет пытаться повторно выполнить задачу, если она завершится с ошибкой. Вы можете настроить это:
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, to_email: str, subject: str, message: str):
try:
print(f"Отправляем письмо на {to_email}...")
# Логика отправки
raise Exception("Ошибка во время отправки!") # Симуляция сбоя
except Exception as exc:
print(f"Сбой: {exc}")
raise self.retry(exc=exc)
Этот код задает максимум 3 попытки с задержкой 60 секунд между ними.
Применение на практике: где это используется?
RabbitMQ и Celery активно применяются для фоновой обработки задач в реальных проектах. Некоторые примеры:
- E-commerce: генерация счетов, отправка уведомлений клиентам.
- Социальные сети: обработка медиа (например, сжатие изображений или видео).
- Сайты бронирований: отправка подтверждений резерваций.
- Финансовый сектор: обработка транзакций и заявок от пользователей.
RabbitMQ помогает минимизировать задержки для пользователя в подобных сценариях и делает приложение более масштабируемым.
Это только начало вашего пути в мир фоновых задач! Мы еще вернемся к Celery в будущих лекциях, а пока можно насладиться новым уровнем производительности ваших приложений.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ