JavaRush /Курсы /Модуль 5. Spring /Лекция 188: Практика: создание продюсера и консьюмера для...

Лекция 188: Практика: создание продюсера и консьюмера для отправки и получения сообщений

Модуль 5. Spring
19 уровень , 7 лекция
Открыта

Сегодня мы реализуем более полный пример продюсер-консьюмер. Цели этой лекции:

  1. Закрепить знания об основах взаимодействия с Kafka через реализацию продюсера и консьюмера на Java.
  2. Настроить простое приложение, которое отправляет и получает сообщения через Kafka.
  3. Понять, как управлять параметрами продюсера и консьюмера в целях оптимизации.

Настройка Maven-зависимостей

Для начала нам потребуется добавить в проект зависимости Kafka-клиента. Откройте ваш pom.xml и добавьте следующий фрагмент:


<dependencies>
    <!-- Kafka-клиент -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.5.1</version>
    </dependency>
</dependencies>

Зависимость kafka-clients предоставляет API для работы с Kafka. Убедитесь, что версия Kafka, указанная здесь, совпадает с локально установленной версией Kafka.


Конфигурация Kafka

Перед тем как реализовывать код, давайте определим:

  1. Топик: создаем топик с именем my-first-topic.
  2. Брокер: указываем адрес Kafka-брокера — в данном случае localhost:9092.

Если вы еще не создали топик, выполните следующую команду в терминале:


bin/kafka-topics.sh --create --topic my-first-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
  • --partitions: количество партиций (3 в данном случае).
  • --replication-factor: фактор репликации (1 для локального использования).

Теперь всё готово к написанию продюсера и консьюмера. Вот где начинает оживать магия.


Реализация продюсера

Продюсер отправляет сообщения в топик. Мы создадим простую Java-программу, которая отправляет текстовые сообщения в Kafka.

Создадим продюсер


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class SimpleProducer {

    public static void main(String[] args) {
        // Настройки продюсера
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Адрес Kafka-брокера
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Сериализация ключа
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Сериализация значения

        // Создание продюсера
        Producer<String, String> producer = new KafkaProducer<>(props);

        try {
            // Отправляем 10 сообщений
            for (int i = 0; i < 10; i++) {
                String key = "key-" + i;
                String value = "message-" + i;

                // Создаем запись (сообщение) для топика
                ProducerRecord<String, String> record = new ProducerRecord<>("my-first-topic", key, value);

                // Отправляем запись и обрабатываем ответ асинхронно
                producer.send(record, (RecordMetadata metadata, Exception exception) -> {
                    if (exception == null) {
                        System.out.printf("Отправлено сообщение: ключ=%s, значение=%s, партиция=%d, смещение=%d%n",
                                key, value, metadata.partition(), metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                });
            }
        } finally {
            // Закрываем продюсер
            producer.close();
        }
    }
}

В этом примере:

  1. Настройки продюсера: bootstrap.servers указывает Kafka-брокер, key.serializer и value.serializer определяют, как ключи и значения сообщений сериализуются.
  2. ProducerRecord: это сообщение, отправляемое в Kafka. Мы используем ключ, чтобы указать, в какую партицию попадет сообщение.
  3. Callback: метод обратного вызова позволяет узнать, успешно ли отправлено сообщение.
  4. Закрытие продюсера: обязательно закрывайте продюсер, чтобы освободить ресурсы.

Запустите этот код, и сообщения полетят в топик my-first-topic.


Реализация консьюмера

Теперь реализуем консьюмера, который считывает сообщения из топика.


import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {

    public static void main(String[] args) {
        // Настройки консьюмера
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-first-group"); // Группа консьюмеров
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest"); // Читаем сообщения с самого начала топика

        // Создание консьюмера
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // Подписываемся на топик
        consumer.subscribe(Collections.singletonList("my-first-topic"));

        try {
            while (true) {
                // Получаем записи из Kafka
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                records.forEach(record -> {
                    System.out.printf("Получено сообщение: ключ=%s, значение=%s, партиция=%d, смещение=%d%n",
                            record.key(), record.value(), record.partition(), record.offset());
                });
            }
        } finally {
            // Закрываем консьюмера
            consumer.close();
        }
    }
}

Вот что происходит в этом примере:

  1. Настройки консьюмера: group.id определяет группу, которой принадлежит консьюмер, auto.offset.reset указывает читать сообщения с начала, если смещения нет.
  2. Подписка на топик: метод subscribe() позволяет подписаться на один или несколько топиков.
  3. poll(): метод для получения новых сообщений из Kafka.
  4. Закрытие консьюмера: также важно закрывать консьюмер.

Запустите этот код, и он выведет сообщения, отправленные нашим продюсером.


Проверка работы

  1. Сначала запустите Kafka и убедитесь, что топик my-first-topic существует.
  2. Запустите SimpleConsumer, который начнет слушать топик.
  3. Запустите SimpleProducer, и вы увидите, что сообщения отправляются продюсером и читаются консьюмером.

Полезные советы

  1. Сериализация и десериализация:
    • Kafka позволяет использовать любые типы данных (например, JSON, Avro). Для этого необходимо воспользоваться соответствующими сериализаторами.
    • Например, для JSON можно использовать org.springframework.kafka.support.serializer.JsonSerializer.
  2. Управление offset:
    • Offset — это "закладка", которая хранит, какие сообщения уже прочитаны.
    • Если нужно читать сообщения повторно, установите enable.auto.commit=false и вручную фиксируйте offset.

Применение в реальной жизни

На практике продюсеры часто используются для записи событий, например, данных о транзакциях, логах или изменениях в базах данных. Консьюмеры, в свою очередь, могут обрабатывать эти события, отправлять уведомления, обновлять кэши или выполнять другие операции.

Kafka — это основа для высоконагруженных систем, таких как платежные шлюзы, системы рекомендаций, аналитика и многое другое. Поэтому понимание работы с Kafka — важный навык для Java-разработчика.

Теперь у вас есть свой продюсер и консьюмер. Вы только что сделали первый шаг в мир асинхронной магии Kafka!

Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ