В предыдущих лекциях мы познакомились с основами интеграции Spring Boot и Kafka, создали продюсера и научились отправлять сообщения. Теперь пришло время научиться эти сообщения получать и обрабатывать!
Роль консьюмера в Kafka
Если продюсер — это тот, кто "рассказывает новости", то консьюмер — это "внимательный слушатель". Консьюмеры читают сообщения из указанных топиков и обрабатывают их. Spring Kafka делает эту работу особенно простой благодаря аннотации @KafkaListener, которая превращает ваш метод в "слушателя" сообщений.
Почему консьюмеры так важны?
- Обработка данных в реальном времени
Как только продюсер отправил сообщение, консьюмер может сразу получить его и обработать. - Гибкость
Консьюмеры позволяют распределять обработку данных по нескольким приложениям. - Масштабируемость
Kafka поддерживает "группы консьюмеров" (Consumer Groups), что позволяет параллельно обрабатывать сообщения из разных партиций.
Настройка Kafka Consumer в Spring Boot
Прежде всего нам нужно настроить конфигурацию для нашего консьюмера. Создадим класс конфигурации:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
В этом примере:
@KafkaListenerуказывает, какой топик слушать.groupIdопределяет группу консьюмеров.- Метод
listenавтоматически вызывается при получении сообщения.
Базовая обработка ошибок
Добавим простую обработку ошибок в наш консьюмер:
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
try {
System.out.println("Получено сообщение: " + message);
processMessage(message);
} catch (Exception e) {
System.err.println("Ошибка при обработке сообщения: " + e.getMessage());
// Здесь можно добавить логику обработки ошибок
}
}
Проверяем работу консьюмера
Создаём топик (если еще не создан):
bin/kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
Запускаем наше приложение
Отправляем тестовое сообщение через консольный продюсер:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
Вводим сообщение и нажимаем Enter. В логах приложения мы должны увидеть:
Получено сообщение: наше тестовое сообщение
Обработка сообщения: наше тестовое сообщение
Что дальше?
Теперь у нас есть базовый консьюмер, который умеет получать и обрабатывать простые сообщения. В следующей лекции мы:
- Научимся работать с бизнес-объектами на примере класса
Order - Настроим продвинутую конфигурацию и грамотное логирование
- Изучим механизмы обработки различных ошибок
- Рассмотрим реальные сценарии использования консьюмера в production
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ