JavaRush /Курсы /Модуль 4: FastAPI /Создание консьюмеров (consumers) для получения и обработк...

Создание консьюмеров (consumers) для получения и обработки сообщений

Модуль 4: FastAPI
12 уровень , 4 лекция
Открыта

До этой лекции мы уже научились основам работы с очередями сообщений, разобрались с архитектурой RabbitMQ, настроили его базовый сервер и даже создали продюсеров для публикации сообщений. Теперь настало время встретить наших маленьких трудяг — консьюмеров. Они будут получать сообщения из очередей и заниматься их обработкой. Если продюсеры — это писатели, отправляющие письма, то консьюмеры — почтальоны, которые доставляют их адресатам.

Консьюмеры, как мы уже упоминали, — это службы, которые получают сообщения из очереди и выполняют какую-то операцию. Например:

  • Логирование данных.
  • Обработка изображений.
  • Отправка писем пользователям.
  • Сложные вычисления.

Как работает консьюмер?

RabbitMQ передаёт сообщения консьюмерам через протокол AMQP (Advanced Message Queuing Protocol). После того как сообщение получено, консьюмер:

  1. Считывает сообщение.
  2. Выполняет логику обработки.
  3. (Опционально) Подтверждает получение сообщения.

Подтверждения (acknowledgements) позволяют RabbitMQ понять, что сообщение успешно обработано. Если подтверждение не было отправлено, сообщение будет считаться не доставленным и помещено обратно в очередь. Таким образом, система сохраняет целостность данных.


Создание простого консьюмера на Python

Давайте начнём с базового примера создания консьюмера для обработки сообщений. Мы будем использовать библиотеку pika, как и при создании продюсеров.

Для начала давайте её установим:


pip install pika

Вот простой пример кода для консьюмера:


import pika

# Подключаемся к RabbitMQ серверу
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Объявляем очередь (она должна быть такой же, как у продюсера)
channel.queue_declare(queue='hello')

# Функция обработки полученного сообщения
def callback(ch, method, properties, body):
    print(f" [x] Получено сообщение: {body.decode('utf-8')}")
    # Здесь можно поставить любую логику обработки сообщения

# Подписываемся на очередь
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Ожидание сообщений. Нажмите CTRL+C для завершения')
channel.start_consuming()

Что здесь происходит?

  1. Подключение к RabbitMQ: мы устанавливаем соединение с RabbitMQ сервером, который работает на localhost.
  2. Очередь: объявляем очередь "hello", как мы делали это в продюсере.
  3. Функция обратного вызова (callback): каждый раз, когда сообщение поступает из очереди, вызывается эта функция. Именно тут мы выполняем логику обработки сообщений.
  4. Подписка на очередь: метод basic_consume привязывает обработчик (callback) к очереди.
  5. Блокирующее ожидание: start_consuming() запускает бесконечный цикл, который ждёт новых сообщений.

Обработка полученных сообщений

Когда речь идёт о написании консьюмера, важно понимать, что обработка сообщений должна быть:

  • Устойчивой к ошибкам: если что-то пойдёт не так, сообщение не должно теряться.
  • Эффективной: обработка должна выполняться быстро, чтобы очередь не "забивалась".

Обработаем возможные ошибки внутри нашего консьюмера:


def callback(ch, method, properties, body):
    try:
        print(f" [x] Обрабатываем сообщение: {body.decode('utf-8')}")
        # Выполняем основную логику
        # Например, сохраняем данные в базу
    except Exception as e:
        print(f"[!] Ошибка обработки сообщения: {e}")
    finally:
        ch.basic_ack(delivery_tag=method.delivery_tag)

Здесь мы используем метод basic_ack, чтобы вручную подтвердить обработку сообщения. Это важно в случае, если консьюмер падает во время обработки сообщения. RabbitMQ вернёт такое сообщение обратно в очередь, если подтверждение не было отправлено.


Работаем с несколькими консьюмерами

Консьюмеры могут работать параллельно, обрабатывая сообщения из одной и той же очереди. Это повышает производительность системы.

Попробуем запустить несколько экземпляров нашего консьюмера. Можно просто запустить один и тот же скрипт несколько раз:


python consumer.py
python consumer.py

RabbitMQ автоматически распределяет сообщения между запущенными консьюмерами, избегая дублирования.


Использование prefetch для ограничения нагрузки

По умолчанию RabbitMQ посылает сообщения консьюмерам как можно быстрее, но это может перегрузить их. Для контроля нагрузки используется параметр prefetch_count.

Добавим ограничение на количество сообщений, которые может обработать консьюмер одновременно:


channel.basic_qos(prefetch_count=1)

Теперь RabbitMQ будет отправлять новое сообщение только после того, как предыдущее будет подтверждено.


Практический пример: обработка данных

Давайте напишем пример, где консьюмер будет обрабатывать данные и сохранять их в файл.

Код консьюмера:


import pika

def save_to_file(message):
    with open("messages.log", "a") as f:
        f.write(f"{message}\n")

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks')

def callback(ch, method, properties, body):
    message = body.decode("utf-8")
    print(f"[x] Сохраняем сообщение: {message}")
    save_to_file(message)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='tasks', on_message_callback=callback)

print(" [*] Ожидание сообщений...")
channel.start_consuming()

Теперь, каждый раз, когда сообщение поступает в очередь tasks, оно сохраняется в файл messages.log.


Типичные ошибки и их решение

Работая с консьюмерами, вы можете столкнуться с рядом проблем:

  • Консьюмер падает из-за необработанных исключений: всегда обрабатывайте ошибки внутри callback.
  • Сообщения теряются: если подтверждение (ack) не отправлено, RabbitMQ считает сообщение необработанным.
  • Перегруженные консьюмеры: используйте basic_qos для управления нагрузкой, чтобы RabbitMQ отправлял сообщения только тогда, когда консьюмер готов их обработать.

Сегодня мы познакомились с консьюмерами RabbitMQ, научились подписываться на очереди и обрабатывать сообщения. В следующих лекциях мы углубимся в асинхронную работу с FastAPI и RabbitMQ, чтобы создавать более сложные архитектуры.

1
Задача
Модуль 4: FastAPI, 12 уровень, 4 лекция
Недоступна
Простой обработчик сообщений
Простой обработчик сообщений
1
Задача
Модуль 4: FastAPI, 12 уровень, 4 лекция
Недоступна
Сохранение сообщений в файл
Сохранение сообщений в файл
3
Опрос
Асинхронное взаимодействие с RabbitMQ, 12 уровень, 4 лекция
Недоступен
Асинхронное взаимодействие с RabbitMQ
Асинхронное взаимодействие с RabbitMQ
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ