Сьогоднішні веб-застосунки часто зіштовхуються з задачами, виконання яких може зайняти чимало часу. Наприклад:
- Відправка підтвердження на електронну пошту при реєстрації.
- Генерація звітів.
- Обробка великих обсягів даних.
- Інтеграція з третіми сервісами, де затримки неминучі (наприклад, платіжні шлюзи).
Фонові задачі — це ніби найняти команду помічників, яка бере на себе «довгі» задачі з ваших плеч і виконує їх у зовнішньому сервісі (у нашому випадку RabbitMQ). Клієнтська частина вашого застосунку при цьому залишається швидкою та відзивчивою.
Налаштування RabbitMQ для фонових задач
Для початку створимо структуру застосунку. У цьому прикладі реалізуємо сценарій відправки електронних листів. Це класична задача, яку часто виносять у фон, щоб вона не «гальмувала» інтерфейс користувача.
Для роботи з фоновою обробкою задач через RabbitMQ ми будемо використовувати бібліотеку Celery. Це потужний інструмент, який легко інтегрується з RabbitMQ.
pip install celery
pip install "celery[rabbitmq]"
Налаштування Celery з RabbitMQ
Створимо новий файл celery_app.py для налаштування Celery у нашому FastAPI-застосунку.
from celery import Celery
# Налаштовуємо додаток Celery і підключаємо RabbitMQ як брокера
celery_app = Celery(
"tasks", # ім'я додатка Celery
broker="amqp://guest:guest@localhost:5672//", # URL RabbitMQ
backend="rpc://" # Сховище результатів виконання задач
)
# Приклад конфігурації Celery
celery_app.conf.update(
task_serializer="json",
accept_content=["json"], # Підтримуємо тільки формат JSON
result_serializer="json",
timezone="UTC", # Ваш часовий пояс
enable_utc=True,
)
У цьому коді ми підключили Celery до локального RabbitMQ з використанням стандартних налаштувань (guest/guest). Також налаштували серіалізацію повідомлень для роботи у форматі JSON.
Реалізація фонового завдання
Тепер ми створимо задачу для відправлення електронної пошти.
Створимо файл tasks.py, де реалізовуватимемо наші задачі.
from time import sleep
from celery_app import celery_app
@celery_app.task
def send_email(to_email: str, subject: str, message: str):
# Симуляція тривалого процесу
print(f"Починаємо відправку листа на {to_email}...")
sleep(5) # Затримка для демонстрації
print(f"Лист відправлено на {to_email} з темою '{subject}'!")
return f"Email відправлено на {to_email}"
Ця задача симулюватиме відправку електронної пошти, використовуючи затримку в 5 секунд (sleep). У реальному проєкті замість цієї затримки можна використовувати, наприклад, бібліотеку smtplib або сторонній сервіс для відправлення листів.
Інтеграція фонових задач у FastAPI
Настав час пов'язати це з нашим застосунком FastAPI. Додамо можливість відправити лист через API, доручивши фоновій задачі його обробку.
Головний файл застосунку
from fastapi import FastAPI, BackgroundTasks
from tasks import send_email
app = FastAPI()
@app.post("/send-email/")
async def api_send_email(to_email: str, subject: str, message: str):
# Запускаємо фонову задачу
send_email.apply_async(args=[to_email, subject, message])
return {"message": "Задачу поставлено в чергу"}
Задача send_email додається в чергу RabbitMQ за допомогою методу apply_async. Це змушує Celery запланувати виконання задачі, тоді як FastAPI миттєво відправляє відповідь клієнту.
Запуск і тестування задачі
Переконайтеся, що RabbitMQ запущений. Якщо ви використовуєте Docker, можна запустити його так:
docker run -d --hostname rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Celery-воркери — це ті «помічники», які виконують задачі, що надходять у чергу. Запустимо їх:
celery -A celery_app worker --loglevel=info
Для тестування використаємо будь-який HTTP-клієнт (наприклад, Postman або cURL):
curl -X POST "http://127.0.0.1:8000/send-email/" \
-H "Content-Type: application/json" \
-d '{"to_email":"test@example.com", "subject":"RabbitMQ", "message":"Цей курс чудовий!"}'
Після виконання запиту ви побачите в консолі Celery-воркера щось на кшталт:
[2023-11-01 12:00:34,123: INFO/MainProcess] Received task: send_email[1234abcd-5678]
Починаємо відправку листа на test@example.com...
Лист відправлено на test@example.com з темою 'RabbitMQ'!
Керування продуктивністю
- Розподіл навантаження
Якщо задачі виконуються занадто повільно, можна масштабувати воркери:
celery -A celery_app worker --loglevel=info --concurrency=4
Цей приклад запускає 4 воркери для паралельної обробки задач. У реальних застосунках значення concurrency можна збільшити залежно від кількості ядер процесора й доступних ресурсів.
- Обмеження повторних спроб
За замовчуванням Celery буде намагатися повторно виконати задачу, якщо вона завершиться з помилкою. Це можна налаштувати:
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, to_email: str, subject: str, message: str):
try:
print(f"Відправляємо лист на {to_email}...")
# Логіка відправлення
raise Exception("Помилка під час відправлення!") # Симуляція збою
except Exception as exc:
print(f"Збій: {exc}")
raise self.retry(exc=exc)
Цей код встановлює максимум 3 спроби з затримкою 60 секунд між ними.
Застосування на практиці: де це використовується?
RabbitMQ та Celery активно застосовуються для фонової обробки задач у реальних проєктах. Декілька прикладів:
- E-commerce: генерація рахунків, відправка сповіщень клієнтам.
- Соціальні мережі: обробка медіа (наприклад, стиснення зображень або відео).
- Сайти бронювань: відправка підтверджень резервувань.
- Фінансовий сектор: обробка транзакцій і заявок від користувачів.
RabbitMQ допомагає мінімізувати затримки для користувача в таких сценаріях і робить застосунок більш масштабованим.
Це лише початок вашого шляху у світ фонових задач! Ми ще повернемося до Celery у майбутніх лекціях, а поки можна насолоджуватися новим рівнем продуктивності ваших застосунків.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ