Давайте изучим, как связать Celery и RabbitMQ, чтобы построить настоящую асинхронную магию для обработки задач. RabbitMQ будет нашим брокером сообщений, который раздаёт задачи воркерам Celery. В результате мы получим систему, которая может масштабироваться, обрабатывать запросы на фоне и справляться с большими нагрузками.
Когда ваш сервер Web-приложения сталкивается с тяжёлыми задачами, например обработкой больших данных или отправкой тонны писем, он может начать "задыхаться". Мы хотим разгрузить сервер, отправляя эти задачи в фоновый режим. Сюда и вступает RabbitMQ.
RabbitMQ работает как посредник:
- Он принимает задачи от вашего приложения (продюсера).
- Хранит их в очередях.
- Воркеры Celery забирают задачи и выполняют их.
Это как доставка пиццы:
- FastAPI/Django — повар, который отдаёт пиццу курьеру.
- RabbitMQ — курьер, который везёт пиццу клиенту.
- Celery — клиент, который с радостью ест пиццу (выполняет задачу).
Установка RabbitMQ
Мы начнём с установки RabbitMQ, чтобы он стал частью нашего асинхронного стека.
Если у вас уже установлен Docker, установка RabbitMQ займёт всего несколько минут. Откройте терминал и выполните:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
- Порт
5672используется для общения RabbitMQ с Celery. - Порт
15672позволяет вам получить доступ к удобному веб-интерфейсу RabbitMQ.
После успешного запуска вы можете открыть браузер и перейти к http://localhost:15672. Логин: guest, пароль: guest.
Если Docker не ваш выбор, проверьте официальную документацию RabbitMQ для установки на вашем компьютере.
Настройка Celery для использования RabbitMQ
Теперь мы свяжем Celery с RabbitMQ в нашем проекте.
Шаг 1: установка зависимостей
Сначала установим Celery и библиотеку kombu, которая обеспечивает связь с RabbitMQ:
pip install celery kombu
Если вы работаете с Django, убедитесь, что Celery уже интегрирован в ваш проект, как это обсуждалось в предыдущих лекциях.
Шаг 2: настройка конфигурации Celery
Теперь создадим или обновим файл конфигурации Celery (например, celery.py в вашем проекте). На этот раз мы укажем RabbitMQ как брокер сообщений.
Пример конфигурации:
from celery import Celery
# Создаём приложение Celery
app = Celery("example_project")
# Указываем RabbitMQ как брокер сообщений
app.conf.broker_url = "amqp://guest:guest@localhost:5672//"
# Дополнительные настройки (опционально)
app.conf.result_backend = "rpc://"
app.conf.task_serializer = "json" # Для сериализации данных задач
app.conf.accept_content = ["json"] # Принимаемые форматы данных
app.conf.timezone = "UTC" # Зона времени
amqp://guest:guest@localhost:5672//— строка подключения к RabbitMQ. Здесь используются стандартный логин и пароль гостя. Если вы настроили свои данные для доступа, замените их.
Шаг 3: объявление простой задачи
Добавим простую задачу, чтобы протестировать работу Celery через RabbitMQ. Создайте файл tasks.py:
from celery import shared_task
@shared_task
def add(x, y):
return x + y
Шаг 4: запуск RabbitMQ и Celery
- Убедитесь, что RabbitMQ работает. В случае с Docker, это легко проверить командой:
docker ps - Запустите Celery воркеры:
celery -A example_project worker --loglevel=infoЗамените
example_projectна имя вашего проекта.
Проверка работы Celery и RabbitMQ
Теперь, когда всё настроено, вызовем нашу задачу add из Python Shell (или через Django):
from tasks import add
result = add.delay(3, 5)
print(f"Task ID: {result.id}")
print(f"Task Result: {result.get()}")
Посмотрите на логи в терминале, где запущен воркер Celery. Вы должны увидеть обработку задачи. Это означает, что RabbitMQ успешно получил задачу и передал её Celery.
Преимущества использования RabbitMQ с Celery
RabbitMQ предоставляет несколько преимуществ, которые делают его идеальным брокером сообщений для Celery:
- Сохранность задач. RabbitMQ надёжно хранит задачи даже при сбоях в сети.
- Масштабируемость. RabbitMQ поддерживает десятки очередей, которые могут обрабатывать Celery-воркеры.
- Долговечность данных. Вы можете настроить "durable" очереди, чтобы задачи сохранялись даже между перезапусками RabbitMQ.
- Поддержка высоких нагрузок. RabbitMQ хорошо справляется с передачей большого количества задач одновременно.
Пример более сложной задачи
Давайте создадим более сложную задачу, например, отправку электронной почты:
import time
from celery import shared_task
@shared_task
def send_email(recipient, subject, body):
print(f"Sending email to {recipient} with subject '{subject}'...")
time.sleep(5) # Имитируем долгую отправку
print("Email sent!")
Такую задачу можно использовать для реальных крупных проектов, где необходимо отправлять уведомления, отчёты или массовую рассылку писем.
Настройка очередей и маршрутов задач
RabbitMQ позволяет создавать разные очереди для разных задач. Например, можно отдельно обрабатывать задачи на отправку писем и задачи по обработке данных.
app.conf.task_routes = {
"tasks.send_email": {"queue": "email_queue"},
"tasks.add": {"queue": "math_queue"},
}
Теперь RabbitMQ будет использовать очереди email_queue и math_queue для маршрутизации задач. Это упрощает масштабирование, так как можно запускать воркеров для конкретных очередей.
Примечания о реальной разработке
- Если вы используете RabbitMQ в продакшене, обязательно настройте свои логины и пароли. По умолчанию используются "guest/guest", что небезопасно.
- Мониторьте производительность RabbitMQ, чтобы избежать перегрузки очередей. Используйте веб-интерфейс RabbitMQ для просмотра загрузки.
- В зависимости от задач, может потребоваться тонкая настройка конфигурации воркеров Celery (например, количество процессов, таймауты).
Теперь у вас есть полноценная интеграция Celery с RabbitMQ для обработки асинхронных задач. В следующей лекции мы продолжим создавать более сложные и реальные сценарии, используя силы асинхронных систем.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ