JavaRush /Курсы /Модуль 4: FastAPI /Создание продюсеров (producers) для публикации сообщений ...

Создание продюсеров (producers) для публикации сообщений в очередь

Модуль 4: FastAPI
12 уровень , 3 лекция
Открыта

Продюсер (producer) — это компонент системы, который отправляет сообщения в очередь RabbitMQ.

Представьте его как диспетчера: он фиксирует события и передаёт задачи дальше — туда, где ими займётся нужный сервис.

Продюсер помогает разбить логику приложения на отдельные, независимые блоки. Это особенно удобно, когда есть задачи, которые можно отложить или передать другим частям системы.

Например:

Представьте онлайн-магазин. Как только пользователь оформил заказ, продюсер сразу же отправляет информацию об этом в очередь. А дальше другие сервисы (консьюмеры) разбирают её по частям: один сохраняет заказ в базу данных, другой отправляет письмо с подтверждением, третий — уведомляет службу доставки. Всё работает параллельно, без задержек, и пользователь не ждёт, пока всё это произойдёт.


Установка библиотеки pika для работы с RabbitMQ

В Python наиболее популярная библиотека для работы с RabbitMQ — pika.

Она поддерживает основные функции RabbitMQ и уже давно зарекомендовала себя. Давайте установим её и приступим к работе.

Установка:

pip install pika

Как видите, всё максимально просто. Теперь мы готовы погружаться в дебри создания нашего первого продюсера.


Настройка соединения RabbitMQ

Первым шагом в создании продюсера является настройка соединения с RabbitMQ. RabbitMQ, по своей сути, не будет обрабатывать сообщения без предоставления корректных данных о подключении.

Пример подключения:


import pika

# Настраиваем параметры подключения
connection_params = pika.ConnectionParameters('localhost')

# Устанавливаем соединение с RabbitMQ
connection = pika.BlockingConnection(connection_params)

# Создаем канал
channel = connection.channel()

# Успех! Вы подключились.
print("Соединение с RabbitMQ установлено!")

Что здесь происходит?

  1. Мы настроили подключение к RabbitMQ, указав, что сервер RabbitMQ работает на localhost (или 127.0.0.1).
  2. Установили соединение и открыли канал — канал, по сути, является путём, по которому ваши сообщения проходят в RabbitMQ.

Объявление очереди

Продюсер не только отправляет сообщения, но также может удостовериться, что целевая очередь существует. Это помогает избежать ошибок в случае, если консьюмер ожидает данные из несуществующей очереди.

Как объявить очередь:


# Объявляем очередь с именем 'hello_queue'.
channel.queue_declare(queue='hello_queue')

print("Очередь 'hello_queue' объявлена!")

Если такой очереди нет, RabbitMQ создаст её. Если же она уже существует, то ничего не произойдет. Удобно!


Отправка сообщения в RabbitMQ

Ну что, пора затарить нашу очередь первым сообщением?

Пример отправки сообщения:


# Отправляем сообщение в очередь
message = "Привет, очередь!"
channel.basic_publish(exchange='', routing_key='hello_queue', body=message)

print(f"Сообщение отправлено: {message}")

Разбор кода:

  • exchange='': мы не указываем обмен (exchange), поэтому используем стандартный (default) обмен.
  • routing_key='hello_queue': указываем имя очереди, в которую отправляется сообщение.
  • body=message: это наше сообщение в виде байтов, которое публикуется в очередь.

Когда этот код выполнится, сообщение будет добавлено в очередь с именем hello_queue.


Закрытие соединения

После завершения работы продюсер должен закрыть соединение с RabbitMQ. Это хорошая практика, предотвращающая утечки ресурсов.


# Закрываем соединение
connection.close()
print("Соединение с RabbitMQ закрыто.")

Полный пример: Hello RabbitMQ!

Все части собраны вместе. Давайте взглянем на полный рабочий пример создания продюсера.

Код:


import pika

# 1. Подключение к RabbitMQ
connection_params = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# 2. Объявление очереди
channel.queue_declare(queue='hello_queue')

# 3. Отправка сообщения
message = "Привет, RabbitMQ!"
channel.basic_publish(exchange='', routing_key='hello_queue', body=message)
print(f"Сообщение отправлено: {message}")

# 4. Закрытие соединения
connection.close()
print("Соединение с RabbitMQ закрыто.")

Запустите этот код, и сообщение благополучно окажется в очереди. Ну а теперь можно переключиться на кофе и наслаждаться.


Обработка ошибок при публикации сообщений

В реальных приложениях никто не застрахован от проблем — будь то недоступный RabbitMQ-сервер или сбои сети. Поэтому важно настроить обработку ошибок. Вот пример:


try:
    # Публикация сообщения
    channel.basic_publish(exchange='', routing_key='hello_queue', body=message)
    print(f"Сообщение успешно отправлено: {message}")
except pika.exceptions.AMQPConnectionError:
    print("Ошибка: не удалось соединиться с RabbitMQ!")
except Exception as e:
    print(f"Произошла ошибка: {e}")

Эта конструкция позволит вам обнаружить и обработать любые ошибки, не допуская аварийного завершения программы.


Отправка сложных сообщений (JSON)

В реальной жизни мы редко отправляем простые строки. А как насчет отправки структурированных данных, например JSON-объектов?

Пример:


import json

message = {'user_id': 123, 'action': 'login', 'timestamp': '2023-10-01T12:00:00Z'}

# Сериализация объекта в JSON
message_json = json.dumps(message)

# Публикация JSON как тела сообщения
channel.basic_publish(exchange='', routing_key='hello_queue', body=message_json)

print(f"JSON сообщение отправлено: {message_json}")

Сторонний сервис или консьюмер сможет десериализовать JSON и использовать его как полноценный объект Python.


Публикация с подтверждением (Delivery Mode)

Иногда требуется убедиться, что сообщение "не исчезнет" при сбое RabbitMQ. Для этого можно включить режим подтверждений.

Настройка:


channel.basic_publish(
    exchange='',
    routing_key='hello_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2  # Это делает сообщение персистентным
    )
)
print("Персистентное сообщение отправлено.")

Однако помните, что даже в этом случае RabbitMQ не хранит сообщения, если очередь не настроена как постоянная durable.


Дальнейшие шаги

Вы только что создали продюсера для RabbitMQ, а это уже половина успеха в асинхронной системе. В следующей лекции мы погрузимся в создание консьюмеров (consumers), чтобы вашу очередь не переполняли забытые сообщения. Подождите... Вы же не хотите, чтобы RabbitMQ "лопнул" от нагрузки?

1
Задача
Модуль 4: FastAPI, 12 уровень, 3 лекция
Недоступна
Создание простого продюсера
Создание простого продюсера
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ