JavaRush /Курсы /Модуль 5. Spring /Лекция 197: Создание Kafka консьюмера и получение сообщен...

Лекция 197: Создание Kafka консьюмера и получение сообщений

Модуль 5. Spring
20 уровень , 6 лекция
Открыта

В предыдущих лекциях мы познакомились с основами интеграции Spring Boot и Kafka, создали продюсера и научились отправлять сообщения. Теперь пришло время научиться эти сообщения получать и обрабатывать!

Роль консьюмера в Kafka

Если продюсер — это тот, кто "рассказывает новости", то консьюмер — это "внимательный слушатель". Консьюмеры читают сообщения из указанных топиков и обрабатывают их. Spring Kafka делает эту работу особенно простой благодаря аннотации @KafkaListener, которая превращает ваш метод в "слушателя" сообщений.

Почему консьюмеры так важны?

  1. Обработка данных в реальном времени
    Как только продюсер отправил сообщение, консьюмер может сразу получить его и обработать.
  2. Гибкость
    Консьюмеры позволяют распределять обработку данных по нескольким приложениям.
  3. Масштабируемость
    Kafka поддерживает "группы консьюмеров" (Consumer Groups), что позволяет параллельно обрабатывать сообщения из разных партиций.

Настройка Kafka Consumer в Spring Boot

Прежде всего нам нужно настроить конфигурацию для нашего консьюмера. Создадим класс конфигурации:


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

   @Bean
   public ConsumerFactory<String, String> consumerFactory() {
       Map<String, Object> props = new HashMap<>();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       return new DefaultKafkaConsumerFactory<>(props);
   }

   @Bean
   public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
       ConcurrentKafkaListenerContainerFactory<String, String> factory =
           new ConcurrentKafkaListenerContainerFactory<>();
       factory.setConsumerFactory(consumerFactory());
       return factory;
   }
}

В этом примере:

  • @KafkaListener указывает, какой топик слушать.
  • groupId определяет группу консьюмеров.
  • Метод listen автоматически вызывается при получении сообщения.

Базовая обработка ошибок

Добавим простую обработку ошибок в наш консьюмер:


@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
    try {
        System.out.println("Получено сообщение: " + message);
        processMessage(message);
    } catch (Exception e) {
        System.err.println("Ошибка при обработке сообщения: " + e.getMessage());
        // Здесь можно добавить логику обработки ошибок
    }
}

Проверяем работу консьюмера

Создаём топик (если еще не создан):


bin/kafka-topics.sh --create \
    --topic my-topic \
    --bootstrap-server localhost:9092 \
    --partitions 1 \
    --replication-factor 1

Запускаем наше приложение

Отправляем тестовое сообщение через консольный продюсер:


bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

Вводим сообщение и нажимаем Enter. В логах приложения мы должны увидеть:


Получено сообщение: наше тестовое сообщение
Обработка сообщения: наше тестовое сообщение

Что дальше?

Теперь у нас есть базовый консьюмер, который умеет получать и обрабатывать простые сообщения. В следующей лекции мы:

  • Научимся работать с бизнес-объектами на примере класса Order
  • Настроим продвинутую конфигурацию и грамотное логирование
  • Изучим механизмы обработки различных ошибок
  • Рассмотрим реальные сценарии использования консьюмера в production
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ