JavaRush /Курсы /Модуль 5. Spring /Лекция 187: Продюсеры и консьюмеры: как они работают

Лекция 187: Продюсеры и консьюмеры: как они работают

Модуль 5. Spring
19 уровень , 6 лекция
Открыта

Настало время погрузиться в то, как работают продюсеры и консьюмеры и что стоит за этими словами, которыми в индустрии так любят бросаться, словно программирование — это сериал про поваров.


Введение в продюсеров и консьюмеров

Итак, начнем. Apache Kafka, словно "город данных", а продюсеры и консьюмеры — его жители. Продюсер — активный житель, который привозит данные в виде "сообщений" и раскладывает их по "топикам" (темам). Консьюмер — это усердный обитатель, который забирает из "топиков" эти сообщения, чтобы обработать и передать их дальше.

Как работают продюсеры?

Продюсер — это клиент Kafka, который публикует сообщения в топики. Благодаря партиционированию Kafka позволяет обрабатывать огромное количество данных одновременно, достигая впечатляющей производительности. Когда данные отправляются продюсером, они записываются в партицию определенного топика.

Механизм работы:

Каждое сообщение имеет ключ и значение. Ключ помогает определить, в какую партицию топика попадет сообщение. Если ключ отсутствует, Kafka применяет алгоритм round-robin, распределяя сообщения равномерно между партициями.

Ключевые моменты работы продюсера:

  • Сообщения отправляются в топик.
  • Если у продюсера задан ключ сообщения, Kafka использует его для выбора партиции.
  • Если ключа нет, сообщения распределяются равномерно.
  • Продюсер может быть настроен для обеспечения гарантий доставки: "at least once", "at most once" или "exactly once".

Пример: если вам нужно обработать заказы из интернет-магазина, продюсер может отправлять сообщения в топик orders, где каждое сообщение содержит данные о новом заказе.


Пример кода продюсера


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        // Конфигурация продюсера
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // Создание продюсера
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        try {
            // Отправка сообщения в топик "orders"
            for (int i = 1; i <= 5; i++) {
                String key = "OrderKey-" + i;
                String value = "OrderValue-" + i;
                producer.send(new ProducerRecord<>("orders", key, value));
                System.out.println("Message sent: " + key + " -> " + value);
            }
        } finally {
            producer.close(); // Закрытие продюсера после отправки
        }
    }
}

В этом примере:

  1. Мы настроили Kafka-продюсер через Properties, указав серверы Kafka и сериализацию для ключей и значений.
  2. Создали объект KafkaProducer для отправки сообщений.
  3. Отправили несколько сообщений в топик orders.

Как работают консьюмеры?

Консьюмер в Kafka — это клиент, который извлекает сообщения из топиков. Он читает сообщения из одной или нескольких партиций, имея доступ к данным в определенной последовательности.

Ключевые аспекты:

  • Консьюмеры объединяются в группы. Каждая партиция одного топика может быть обработана только одним консьюмером из группы.
  • Kafka использует offset для отслеживания того, какие сообщения уже были обработаны консьюмером.
  • Консьюмеры могут работать в автоматическом или ручном режимах подтверждения обработки сообщения (commit offset).

Представьте, что ваш интернет-магазин обрабатывает заказы. Консьюмер будет забирать сообщения из топика orders и обрабатывать их, например, отправлять уведомления.

Пример кода консьюмера


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        // Конфигурация консьюмера
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // Создание консюмера
        KafkaConsumer<String, String< consumer = new KafkaConsumer<>(props);

        // Подписываемся на топик "orders"
        consumer.subscribe(Collections.singletonList("orders"));

        try {
            // Бесконечный цикл для чтения сообщений
            while (true) {
                ConsumerRecords<String, String< records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String< record : records) {
                    System.out.println("Received message: " + record.key() + " -> " + record.value());
                }
            }
        } finally {
            consumer.close(); // Закрытие консьюмера
        }
    }
}

В этом примере мы:

  1. Настраиваем конфигурацию, включая ID группы и десериализацию.
  2. Создаём объект KafkaConsumer для подключения к серверу Kafka.
  3. Подписываемся на топик orders.
  4. Читаем сообщения с использованием метода poll, пока программа работает.

Общая архитектура взаимодействия

Компонент Задача
Продюсер Отправляет сообщения в топик
Консьюмер Подписывается на топик и обрабатывает сообщения
Топик Хранит сообщения, организованные в партиции
Партиция Разделяет сообщения внутри топика для параллельной обработки
Offset Отслеживает, какие сообщения были обработаны

Частые ошибки и как их избегать

  1. Если продюсер или консьюмер не видит топик, проверьте корректность настроек bootstrap.servers.
  2. Конфликты offset часто возникают из-за неправильной настройки auto.offset.reset. Используйте earliest для чтения всех сообщений с начала или latest для новых сообщений.
  3. Забудете закрыть продюсера или консьюмера? Не делайте этого, чтобы не столкнуться с утечками ресурсов.

Совет: на реальных проектах всегда логируйте события продюсеров и консьюмеров. Это упростит диагностику проблем.


Kafka — это мощный инструмент, и с пониманием, как работают продюсеры и консьюмеры, перед вами открываются двери в мир асинхронной обработки данных. Вы можете легко строить распределенные системы, интегрировать микросервисы и обеспечивать высокую надежность отправки и обработки сообщений.

Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ