Здесь мы разберёмся, как реализовать продюсеров и консьюмеров Kafka в приложении на Spring Boot. Это решающий шаг, чтобы ваши микросервисы начали активно общаться между собой через события. А как всем известно: "Микросервис без событий — просто молчаливый кусок кода!"
1. Краткий обзор: зачем нам продюсеры и консьюмеры?
Apache Kafka — это система распределённой обработки событий. В ней есть:
- Продюсеры (Producers) — приложения, которые отправляют сообщения в топики.
- Консьюмеры (Consumers) — приложения, которые получают сообщения из топиков.
Практическое применение:
Продюсеры и консьюмеры позволяют микросервисам эффективно обрабатывать данные, передавая их друг другу в асинхронном режиме. Например, один микросервис может отправлять заказы в топик, а другой — обрабатывать их, читая из этого топика.
2. Создание Kafka-продюсера
2.1 Подготовка конфигурации Kafka в Spring Boot
Начнём с настройки нашего Spring Boot приложения. Добавьте зависимости Kafka в pom.xml:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.3</version> <!-- Укажите актуальную версию -->
</dependency>
Теперь создадим конфигурационный класс для Kafka. Этот класс скажет Spring, как подключаться к Kafka и управлять продюсерами.
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
// Настройки для подключения к Kafka
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Адрес Kafka брокера
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
// KafkaTemplate используется для отправки сообщений
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
2.2 Реализация продюсера
Теперь напишем сервис для отправки сообщений в Kafka. Это будет наш продюсер.
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// Метод для отправки сообщений
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Сообщение отправлено в топик: " + topic);
}
}
2.3 Тестирование продюсера
Для тестирования создадим REST-контроллер, чтобы мы могли отправлять сообщения через HTTP-запросы. Это удобно, чтобы проверить, что всё работает правильно.
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
private final KafkaProducerService kafkaProducerService;
public KafkaController(KafkaProducerService kafkaProducerService) {
this.kafkaProducerService = kafkaProducerService;
}
// Отправляем сообщение в Kafka с помощью HTTP GET запроса
@GetMapping("/send")
public String sendMessageToKafka(@RequestParam String topic, @RequestParam String message) {
kafkaProducerService.sendMessage(topic, message);
return "Сообщение отправлено в Kafka!";
}
}
Запустите приложение и отправьте GET-запрос через браузер или Postman:
http://localhost:8080/send?topic=test_topic&message=Привет, Kafka!
Если вы видите в консоли сообщение Сообщение отправлено в топик: test_topic, значит продюсер готов!
3. Создание Kafka-консьюмера
3.1 Настройка консьюмера
Добавляем параметры для консьюмера в конфигурационный файл application.properties:
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3.2 Реализация консьюмера
Теперь создадим сервис для обработки сообщений, поступающих из Kafka.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
// Метод будет вызываться автоматически при получении сообщения
@KafkaListener(topics = "test_topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Получено сообщение: " + message);
}
}
3.3 Проверяем работу консьюмера
Перезапустите приложение и отправьте сообщение через ваш продюсер (например, используя контроллер из прошлого шага). В консоли вы увидите:
Получено сообщение: Привет, Kafka!
Поздравляю, ваш консьюмер успешно обрабатывает сообщения!
4. Настройка сериализации для сложных объектов
Иногда вашим микросервисам нужно передавать не просто текстовые строки, а сложные объекты (например, JSON). Для этого потребуется настроить сериализацию.
4.1 Конфигурация сериализации
Добавьте зависимость для маппера Jackson в pom.xml:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
Теперь обновим конфигурацию Kafka, чтобы использовать JsonSerializer и JsonDeserializer.
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
// Для продюсера
@Bean
public ProducerFactory<String, YourCustomObject> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
// Для консьюмера
@Bean
public ConsumerFactory<String, YourCustomObject> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(configProps, new StringDeserializer(), new JsonDeserializer<>(YourCustomObject.class));
}
4.2 Пример работы с кастомными объектами
Создайте объект, который вы хотите передавать:
public class YourCustomObject {
private String name;
private int age;
// Конструкторы, геттеры и сеттеры
}
Теперь можно отправлять и получать объекты через Kafka. Всё работает, как и раньше, но теперь с использованием вашей модели данных.
5. Что делать, если что-то пошло не так?
- Сообщения не доходят до консьюмера: проверьте, совпадает ли
group idв настройках консьюмера иtopicв настройках продюсера. - Ошибка десериализации: если сериализация/десериализация не работает, убедитесь, что ваш объект имеет конструктор без параметров и правильно настроен Jackson.
Реальное применение
Теперь вы знаете, как ваши микросервисы могут взаимодействовать друг с другом через события Kafka. Этот подход используется в реальных проектах для:
- Логирования действий пользователей.
- Отправки уведомлений (например, email или пуш-уведомлений).
- Синхронизации данных между серверами.
Kafka обеспечивает высокую производительность и надёжность, что делает её незаменимым инструментом для построения масштабируемых архитектур.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ