До цієї лекції ми вже навчились основам роботи з чергами повідомлень, розібралися з архітектурою RabbitMQ, налаштували його базовий сервер і навіть створили продюсерів для публікації повідомлень. Тепер настав час познайомитись з нашими маленькими працівниками — консюмери. Вони будуть отримувати повідомлення з черг і займатись їх обробкою. Якщо продюсери — це письменники, які відправляють листи, то консюмери — листоноші, які доставляють їх адресатам.
Консюмери, як ми вже згадували, — це служби, які отримують повідомлення з черги і виконують якусь операцію. Наприклад:
- Логування даних.
- Обробка зображень.
- Відправка листів користувачам.
- Складні обчислення.
Як працює консюмер?
RabbitMQ передає повідомлення консюмерів через протокол AMQP (Advanced Message Queuing Protocol). Після того як повідомлення отримано, консюмер:
- Зчитує повідомлення.
- Виконує логіку обробки.
- (Опціонально) Підтверджує отримання повідомлення.
Підтвердження (acknowledgements) дозволяють RabbitMQ зрозуміти, що повідомлення успішно оброблено. Якщо підтвердження не було відправлене, повідомлення вважатиметься не доставленим і буде поміщене назад у чергу. Таким чином система зберігає цілісність даних.
Створення простого консюмеру на Python
Давайте почнемо з базового прикладу створення консюмеру для обробки повідомлень. Ми будемо використовувати бібліотеку pika, як і при створенні продюсерів.
Для початку встановимо її:
pip install pika
Ось простий приклад коду для консюмеру:
import pika
# Підключаємось до RabbitMQ-сервера
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Оголошуємо чергу (вона має бути така ж, як у продюсера)
channel.queue_declare(queue='hello')
# Функція обробки отриманого повідомлення
def callback(ch, method, properties, body):
print(f" [x] Отримано повідомлення: {body.decode('utf-8')}")
# Тут можна помістити будь-яку логіку обробки повідомлення
# Підписуємось на чергу
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Очікування повідомлень. Натисніть CTRL+C для завершення')
channel.start_consuming()
Що тут відбувається?
- Підключення до RabbitMQ: ми встановлюємо з'єднання з RabbitMQ-сервером, який працює на
localhost. - Черга: оголошуємо чергу
"hello", як ми робили це в продюсері. - Функція зворотного виклику (callback): щоразу, коли повідомлення надходить з черги, викликається ця функція. Саме тут ми виконуємо логіку обробки повідомлень.
- Підписка на чергу: метод
basic_consumeприв'язує обробник (callback) до черги. - Блокуюче очікування:
start_consuming()запускає нескінченний цикл, який чекає нових повідомлень.
Обробка отриманих повідомлень
Коли йдеться про написання консюмеру, важливо розуміти, що обробка повідомлень повинна бути:
- Стійкою до помилок: якщо щось піде не так, повідомлення не повинно загубитися.
- Ефективною: обробка має виконуватись швидко, щоб черга не "забивалась".
Обробимо можливі помилки всередині нашого консюмеру:
def callback(ch, method, properties, body):
try:
print(f" [x] Обробляємо повідомлення: {body.decode('utf-8')}")
# Виконуємо основну логіку
# Наприклад, зберігаємо дані в базу
except Exception as e:
print(f"[!] Помилка обробки повідомлення: {e}")
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
Тут ми використовуємо метод basic_ack, щоб вручну підтвердити обробку повідомлення. Це важливо на випадок, якщо консюмер падає під час обробки повідомлення. RabbitMQ поверне таке повідомлення назад у чергу, якщо підтвердження не було відправлено.
Працюємо з кількома консюмерими
Консюмери можуть працювати паралельно, обробляючи повідомлення з однієї і тієї ж черги. Це підвищує продуктивність системи.
Спробуємо запустити кілька екземплярів нашого консюмеру. Можна просто запустити один і той же скрипт кілька разів:
python consumer.py
python consumer.py
RabbitMQ автоматично розподіляє повідомлення між запущеними консюмерими, уникаючи дублювання.
Використання prefetch для обмеження навантаження
За замовчуванням RabbitMQ надсилає повідомлення консюмерим якнайшвидше, але це може перевантажити їх. Для контролю навантаження використовується параметр prefetch_count.
Додамо обмеження на кількість повідомлень, які може обробляти консюмер одночасно:
channel.basic_qos(prefetch_count=1)
Тепер RabbitMQ буде надсилати нове повідомлення тільки після того, як попереднє буде підтверджене.
Практичний приклад: обробка даних
Давайте напишемо приклад, де консюмер буде обробляти дані і зберігати їх у файл.
Код консюмеру:
import pika
def save_to_file(message):
with open("messages.log", "a") as f:
f.write(f"{message}\n")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks')
def callback(ch, method, properties, body):
message = body.decode("utf-8")
print(f"[x] Зберігаємо повідомлення: {message}")
save_to_file(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='tasks', on_message_callback=callback)
print(" [*] Очікування повідомлень...")
channel.start_consuming()
Тепер щоразу, коли повідомлення надходить у чергу tasks, воно зберігається у файл messages.log.
Типові помилки та їх вирішення
Працюючи з консюмерими, ви можете зіткнутись з низкою проблем:
- Консюмер падає через необроблені виключення: завжди обробляйте помилки всередині
callback. - Повідомлення втрачаються: якщо підтвердження (ack) не відправлене, RabbitMQ вважає повідомлення необробленим.
- Перевантажені консюмери: використовуйте
basic_qosдля керування навантаженням, щоб RabbitMQ надсилав повідомлення тільки тоді, коли консюмер готовий їх обробити.
Сьогодні ми познайомилися з консюмерими RabbitMQ, навчилися підписуватись на черги і обробляти повідомлення. У наступних лекціях ми заглибимось в асинхронну роботу з FastAPI і RabbitMQ, щоб створювати більш складні архітектури.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ