JavaRush /Курси /Модуль 4: FastAPI /Створення продюсерів (producers) для публікації повідомле...

Створення продюсерів (producers) для публікації повідомлень у чергу

Модуль 4: FastAPI
Рівень 12 , Лекція 3
Відкрита

Продюсер (producer) — це компонент системи, який відправляє повідомлення в чергу RabbitMQ.

Уявіть його як диспетчера: він фіксує події і передає завдання далі — туди, де ними займеться потрібний сервіс.

Продюсер допомагає розбити логіку додатку на окремі, незалежні блоки. Це особливо зручно, коли є завдання, які можна відкласти або передати іншим частинам системи.

Наприклад:

Уявіть онлайн-магазин. Як тільки користувач оформив замовлення, продюсер одразу відправляє інформацію про це в чергу. А далі інші сервіси (consumers) розбирають її по частинах: один зберігає замовлення в базу даних, інший відправляє лист з підтвердженням, третій — повідомляє службу доставки. Все працює паралельно, без затримок, і користувач не чекає, поки це все відбудеться.


Встановлення бібліотеки pika для роботи з RabbitMQ

У Python найпопулярніша бібліотека для роботи з RabbitMQ — pika.

Вона підтримує основні можливості RabbitMQ і давно себе зарекомендувала. Давайте встановимо її й приступимо до роботи.

Встановлення:

pip install pika

Як бачите, все максимально просто. Тепер ми готові занурюватись у тонкощі створення нашого першого продюсера.


Налаштування з'єднання RabbitMQ

Першим кроком у створенні продюсера є налаштування з'єднання з RabbitMQ. RabbitMQ, по суті, не буде обробляти повідомлення без надання коректних даних про підключення.

Приклад підключення:


import pika

# Налаштовуємо параметри підключення
connection_params = pika.ConnectionParameters('localhost')

# Встановлюємо з'єднання з RabbitMQ
connection = pika.BlockingConnection(connection_params)

# Створюємо канал
channel = connection.channel()

# Успіх! Ви підключилися.
print("З'єднання з RabbitMQ встановлено!")

Що тут відбувається?

  1. Ми налаштували підключення до RabbitMQ, вказавши, що сервер RabbitMQ працює на localhost (або 127.0.0.1).
  2. Встановили з'єднання і відкрили канал — канал, по суті, є шляхом, яким ваші повідомлення проходять у RabbitMQ.

Оголошення черги

Продюсер не тільки відправляє повідомлення, але також може переконатися, що цільова черга існує. Це допомагає уникнути помилок у випадку, якщо consumer очікує дані з неіснуючої черги.

Як оголосити чергу:


# Оголошуємо чергу з іменем 'hello_queue'.
channel.queue_declare(queue='hello_queue')

print("Черга 'hello_queue' оголошена!")

Якщо такої черги немає, RabbitMQ створить її. Якщо ж вона вже існує, то нічого не станеться. Зручно!


Відправка повідомлення в RabbitMQ

Ну що, час закинути в нашу чергу перше повідомлення?

Приклад відправки повідомлення:


# Відправляємо повідомлення в чергу
message = "Привіт, черга!"
channel.basic_publish(exchange='', routing_key='hello_queue', body=message)

print(f"Повідомлення відправлене: {message}")

Розбір коду:

  • exchange='': ми не вказуємо exchange, тому використовуємо стандартний (default) exchange.
  • routing_key='hello_queue': вказуємо ім'я черги, в яку відправляється повідомлення.
  • body=message: це наше повідомлення у вигляді байтів, яке публікується в чергу.

Коли цей код виконається, повідомлення буде додано в чергу з ім'ям hello_queue.


Закриття з'єднання

Після завершення роботи продюсер має закрити з'єднання з RabbitMQ. Це хороша практика, яка запобігає витіканню ресурсів.


# Закриваємо з'єднання
connection.close()
print("З'єднання з RabbitMQ закрите.")

Повний приклад: Hello RabbitMQ!

Усі частини зібрані разом. Давайте поглянемо на повний робочий приклад створення продюсера.

Код:


import pika

# 1. Підключення до RabbitMQ
connection_params = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

# 2. Оголошення черги
channel.queue_declare(queue='hello_queue')

# 3. Відправка повідомлення
message = "Привіт, RabbitMQ!"
channel.basic_publish(exchange='', routing_key='hello_queue', body=message)
print(f"Повідомлення відправлене: {message}")

# 4. Закриття з'єднання
connection.close()
print("З'єднання з RabbitMQ закрите.")

Запустіть цей код, і повідомлення успішно опиниться в черзі. Ну а тепер можна переключитися на каву і насолоджуватися.


Обробка помилок при публікації повідомлень

У реальних застосунках ніхто не застрахований від проблем — будь то недоступний RabbitMQ-сервер або збої мережі. Тому важливо налаштувати обробку помилок. Ось приклад:


try:
    # Публікація повідомлення
    channel.basic_publish(exchange='', routing_key='hello_queue', body=message)
    print(f"Повідомлення успішно відправлене: {message}")
except pika.exceptions.AMQPConnectionError:
    print("Помилка: не вдалося з'єднатися з RabbitMQ!")
except Exception as e:
    print(f"Сталася помилка: {e}")

Ця конструкція дозволить вам виявити й обробити будь-які помилки, не допуская аварійного завершення програми.


Відправка складних повідомлень (JSON)

У реальному житті ми рідко відправляємо прості рядки. А як щодо відправки структурованих даних, наприклад JSON-об'єктів?

Приклад:


import json

message = {'user_id': 123, 'action': 'login', 'timestamp': '2023-10-01T12:00:00Z'}

# Серіалізація об'єкта в JSON
message_json = json.dumps(message)

# Публікація JSON як тіла повідомлення
channel.basic_publish(exchange='', routing_key='hello_queue', body=message_json)

print(f"JSON повідомлення відправлене: {message_json}")

Сторонній сервіс або consumer зможе десеріалізувати JSON і використати його як повноцінний об'єкт Python.


Публікація з підтвердженням (Delivery Mode)

Іноді потрібно переконатися, що повідомлення "не зникне" при збої RabbitMQ. Для цього можна ввімкнути режим підтверджень.

Налаштування:


channel.basic_publish(
    exchange='',
    routing_key='hello_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2  # Це робить повідомлення персистентним
    )
)
print("Персистентне повідомлення відправлене.")

Однак пам'ятайте, що навіть у цьому випадку RabbitMQ не зберігатиме повідомлення, якщо черга не налаштована як постійна durable.


Подальші кроки

Ви щойно створили продюсера для RabbitMQ, а це вже половина успіху в асинхронній системі. У наступній лекції ми зануримося в створення консьюмерів (consumers), щоб вашу чергу не переповнювали забуті повідомлення. Зачекайте... Ви ж не хочете, щоб RabbitMQ "лопнув" від навантаження?

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ