JavaRush /Курсы /Модуль 4: FastAPI /Интеграция Celery с RabbitMQ для обработки задач

Интеграция Celery с RabbitMQ для обработки задач

Модуль 4: FastAPI
13 уровень , 3 лекция
Открыта

Давайте изучим, как связать Celery и RabbitMQ, чтобы построить настоящую асинхронную магию для обработки задач. RabbitMQ будет нашим брокером сообщений, который раздаёт задачи воркерам Celery. В результате мы получим систему, которая может масштабироваться, обрабатывать запросы на фоне и справляться с большими нагрузками.

Когда ваш сервер Web-приложения сталкивается с тяжёлыми задачами, например обработкой больших данных или отправкой тонны писем, он может начать "задыхаться". Мы хотим разгрузить сервер, отправляя эти задачи в фоновый режим. Сюда и вступает RabbitMQ.

RabbitMQ работает как посредник:

  • Он принимает задачи от вашего приложения (продюсера).
  • Хранит их в очередях.
  • Воркеры Celery забирают задачи и выполняют их.

Это как доставка пиццы:

  • FastAPI/Django — повар, который отдаёт пиццу курьеру.
  • RabbitMQ — курьер, который везёт пиццу клиенту.
  • Celery — клиент, который с радостью ест пиццу (выполняет задачу).

Установка RabbitMQ

Мы начнём с установки RabbitMQ, чтобы он стал частью нашего асинхронного стека.

Если у вас уже установлен Docker, установка RabbitMQ займёт всего несколько минут. Откройте терминал и выполните:


docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
  • Порт 5672 используется для общения RabbitMQ с Celery.
  • Порт 15672 позволяет вам получить доступ к удобному веб-интерфейсу RabbitMQ.

После успешного запуска вы можете открыть браузер и перейти к http://localhost:15672. Логин: guest, пароль: guest.

Если Docker не ваш выбор, проверьте официальную документацию RabbitMQ для установки на вашем компьютере.


Настройка Celery для использования RabbitMQ

Теперь мы свяжем Celery с RabbitMQ в нашем проекте.

Шаг 1: установка зависимостей

Сначала установим Celery и библиотеку kombu, которая обеспечивает связь с RabbitMQ:


pip install celery kombu

Если вы работаете с Django, убедитесь, что Celery уже интегрирован в ваш проект, как это обсуждалось в предыдущих лекциях.

Шаг 2: настройка конфигурации Celery

Теперь создадим или обновим файл конфигурации Celery (например, celery.py в вашем проекте). На этот раз мы укажем RabbitMQ как брокер сообщений.

Пример конфигурации:


from celery import Celery

# Создаём приложение Celery
app = Celery("example_project")

# Указываем RabbitMQ как брокер сообщений
app.conf.broker_url = "amqp://guest:guest@localhost:5672//"

# Дополнительные настройки (опционально)
app.conf.result_backend = "rpc://"
app.conf.task_serializer = "json"  # Для сериализации данных задач
app.conf.accept_content = ["json"]  # Принимаемые форматы данных
app.conf.timezone = "UTC"  # Зона времени
  • amqp://guest:guest@localhost:5672// — строка подключения к RabbitMQ. Здесь используются стандартный логин и пароль гостя. Если вы настроили свои данные для доступа, замените их.

Шаг 3: объявление простой задачи

Добавим простую задачу, чтобы протестировать работу Celery через RabbitMQ. Создайте файл tasks.py:


from celery import shared_task

@shared_task
def add(x, y):
    return x + y

Шаг 4: запуск RabbitMQ и Celery

  1. Убедитесь, что RabbitMQ работает. В случае с Docker, это легко проверить командой:
    
    docker ps
            
  2. Запустите Celery воркеры:
    
    celery -A example_project worker --loglevel=info
            

    Замените example_project на имя вашего проекта.


Проверка работы Celery и RabbitMQ

Теперь, когда всё настроено, вызовем нашу задачу add из Python Shell (или через Django):


from tasks import add

result = add.delay(3, 5)
print(f"Task ID: {result.id}")
print(f"Task Result: {result.get()}")

Посмотрите на логи в терминале, где запущен воркер Celery. Вы должны увидеть обработку задачи. Это означает, что RabbitMQ успешно получил задачу и передал её Celery.


Преимущества использования RabbitMQ с Celery

RabbitMQ предоставляет несколько преимуществ, которые делают его идеальным брокером сообщений для Celery:

  • Сохранность задач. RabbitMQ надёжно хранит задачи даже при сбоях в сети.
  • Масштабируемость. RabbitMQ поддерживает десятки очередей, которые могут обрабатывать Celery-воркеры.
  • Долговечность данных. Вы можете настроить "durable" очереди, чтобы задачи сохранялись даже между перезапусками RabbitMQ.
  • Поддержка высоких нагрузок. RabbitMQ хорошо справляется с передачей большого количества задач одновременно.

Пример более сложной задачи

Давайте создадим более сложную задачу, например, отправку электронной почты:


import time
from celery import shared_task

@shared_task
def send_email(recipient, subject, body):
    print(f"Sending email to {recipient} with subject '{subject}'...")
    time.sleep(5)  # Имитируем долгую отправку
    print("Email sent!")

Такую задачу можно использовать для реальных крупных проектов, где необходимо отправлять уведомления, отчёты или массовую рассылку писем.


Настройка очередей и маршрутов задач

RabbitMQ позволяет создавать разные очереди для разных задач. Например, можно отдельно обрабатывать задачи на отправку писем и задачи по обработке данных.


app.conf.task_routes = {
    "tasks.send_email": {"queue": "email_queue"},
    "tasks.add": {"queue": "math_queue"},
}

Теперь RabbitMQ будет использовать очереди email_queue и math_queue для маршрутизации задач. Это упрощает масштабирование, так как можно запускать воркеров для конкретных очередей.


Примечания о реальной разработке

  1. Если вы используете RabbitMQ в продакшене, обязательно настройте свои логины и пароли. По умолчанию используются "guest/guest", что небезопасно.
  2. Мониторьте производительность RabbitMQ, чтобы избежать перегрузки очередей. Используйте веб-интерфейс RabbitMQ для просмотра загрузки.
  3. В зависимости от задач, может потребоваться тонкая настройка конфигурации воркеров Celery (например, количество процессов, таймауты).

Теперь у вас есть полноценная интеграция Celery с RabbitMQ для обработки асинхронных задач. В следующей лекции мы продолжим создавать более сложные и реальные сценарии, используя силы асинхронных систем.

1
Задача
Модуль 4: FastAPI, 13 уровень, 3 лекция
Недоступна
Создание задачи с Celery
Создание задачи с Celery
1
Задача
Модуль 4: FastAPI, 13 уровень, 3 лекция
Недоступна
Настройка маршрутов задач
Настройка маршрутов задач
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ