Продюсер (producer) — це компонент системи, який відправляє повідомлення в чергу RabbitMQ.
Уявіть його як диспетчера: він фіксує події і передає завдання далі — туди, де ними займеться потрібний сервіс.
Продюсер допомагає розбити логіку додатку на окремі, незалежні блоки. Це особливо зручно, коли є завдання, які можна відкласти або передати іншим частинам системи.
Наприклад:
Уявіть онлайн-магазин. Як тільки користувач оформив замовлення, продюсер одразу відправляє інформацію про це в чергу. А далі інші сервіси (consumers) розбирають її по частинах: один зберігає замовлення в базу даних, інший відправляє лист з підтвердженням, третій — повідомляє службу доставки. Все працює паралельно, без затримок, і користувач не чекає, поки це все відбудеться.
Встановлення бібліотеки pika для роботи з RabbitMQ
У Python найпопулярніша бібліотека для роботи з RabbitMQ — pika.
Вона підтримує основні можливості RabbitMQ і давно себе зарекомендувала. Давайте встановимо її й приступимо до роботи.
Встановлення:
pip install pika
Як бачите, все максимально просто. Тепер ми готові занурюватись у тонкощі створення нашого першого продюсера.
Налаштування з'єднання RabbitMQ
Першим кроком у створенні продюсера є налаштування з'єднання з RabbitMQ. RabbitMQ, по суті, не буде обробляти повідомлення без надання коректних даних про підключення.
Приклад підключення:
import pika
# Налаштовуємо параметри підключення
connection_params = pika.ConnectionParameters('localhost')
# Встановлюємо з'єднання з RabbitMQ
connection = pika.BlockingConnection(connection_params)
# Створюємо канал
channel = connection.channel()
# Успіх! Ви підключилися.
print("З'єднання з RabbitMQ встановлено!")
Що тут відбувається?
- Ми налаштували підключення до RabbitMQ, вказавши, що сервер RabbitMQ працює на
localhost(або127.0.0.1). - Встановили з'єднання і відкрили канал — канал, по суті, є шляхом, яким ваші повідомлення проходять у RabbitMQ.
Оголошення черги
Продюсер не тільки відправляє повідомлення, але також може переконатися, що цільова черга існує. Це допомагає уникнути помилок у випадку, якщо consumer очікує дані з неіснуючої черги.
Як оголосити чергу:
# Оголошуємо чергу з іменем 'hello_queue'.
channel.queue_declare(queue='hello_queue')
print("Черга 'hello_queue' оголошена!")
Якщо такої черги немає, RabbitMQ створить її. Якщо ж вона вже існує, то нічого не станеться. Зручно!
Відправка повідомлення в RabbitMQ
Ну що, час закинути в нашу чергу перше повідомлення?
Приклад відправки повідомлення:
# Відправляємо повідомлення в чергу
message = "Привіт, черга!"
channel.basic_publish(exchange='', routing_key='hello_queue', body=message)
print(f"Повідомлення відправлене: {message}")
Розбір коду:
exchange='': ми не вказуємо exchange, тому використовуємо стандартний (default) exchange.routing_key='hello_queue': вказуємо ім'я черги, в яку відправляється повідомлення.body=message: це наше повідомлення у вигляді байтів, яке публікується в чергу.
Коли цей код виконається, повідомлення буде додано в чергу з ім'ям hello_queue.
Закриття з'єднання
Після завершення роботи продюсер має закрити з'єднання з RabbitMQ. Це хороша практика, яка запобігає витіканню ресурсів.
# Закриваємо з'єднання
connection.close()
print("З'єднання з RabbitMQ закрите.")
Повний приклад: Hello RabbitMQ!
Усі частини зібрані разом. Давайте поглянемо на повний робочий приклад створення продюсера.
Код:
import pika
# 1. Підключення до RabbitMQ
connection_params = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
# 2. Оголошення черги
channel.queue_declare(queue='hello_queue')
# 3. Відправка повідомлення
message = "Привіт, RabbitMQ!"
channel.basic_publish(exchange='', routing_key='hello_queue', body=message)
print(f"Повідомлення відправлене: {message}")
# 4. Закриття з'єднання
connection.close()
print("З'єднання з RabbitMQ закрите.")
Запустіть цей код, і повідомлення успішно опиниться в черзі. Ну а тепер можна переключитися на каву і насолоджуватися.
Обробка помилок при публікації повідомлень
У реальних застосунках ніхто не застрахований від проблем — будь то недоступний RabbitMQ-сервер або збої мережі. Тому важливо налаштувати обробку помилок. Ось приклад:
try:
# Публікація повідомлення
channel.basic_publish(exchange='', routing_key='hello_queue', body=message)
print(f"Повідомлення успішно відправлене: {message}")
except pika.exceptions.AMQPConnectionError:
print("Помилка: не вдалося з'єднатися з RabbitMQ!")
except Exception as e:
print(f"Сталася помилка: {e}")
Ця конструкція дозволить вам виявити й обробити будь-які помилки, не допуская аварійного завершення програми.
Відправка складних повідомлень (JSON)
У реальному житті ми рідко відправляємо прості рядки. А як щодо відправки структурованих даних, наприклад JSON-об'єктів?
Приклад:
import json
message = {'user_id': 123, 'action': 'login', 'timestamp': '2023-10-01T12:00:00Z'}
# Серіалізація об'єкта в JSON
message_json = json.dumps(message)
# Публікація JSON як тіла повідомлення
channel.basic_publish(exchange='', routing_key='hello_queue', body=message_json)
print(f"JSON повідомлення відправлене: {message_json}")
Сторонній сервіс або consumer зможе десеріалізувати JSON і використати його як повноцінний об'єкт Python.
Публікація з підтвердженням (Delivery Mode)
Іноді потрібно переконатися, що повідомлення "не зникне" при збої RabbitMQ. Для цього можна ввімкнути режим підтверджень.
Налаштування:
channel.basic_publish(
exchange='',
routing_key='hello_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2 # Це робить повідомлення персистентним
)
)
print("Персистентне повідомлення відправлене.")
Однак пам'ятайте, що навіть у цьому випадку RabbitMQ не зберігатиме повідомлення, якщо черга не налаштована як постійна durable.
Подальші кроки
Ви щойно створили продюсера для RabbitMQ, а це вже половина успіху в асинхронній системі. У наступній лекції ми зануримося в створення консьюмерів (consumers), щоб вашу чергу не переповнювали забуті повідомлення. Зачекайте... Ви ж не хочете, щоб RabbitMQ "лопнув" від навантаження?
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ