JavaRush /Курсы /Модуль 5. Spring /Лекция 275: Практика — настройка продюсеров и консьюмеров...

Лекция 275: Практика — настройка продюсеров и консьюмеров Kafka

Модуль 5. Spring
28 уровень , 4 лекция
Открыта

Здесь мы разберёмся, как реализовать продюсеров и консьюмеров Kafka в приложении на Spring Boot. Это решающий шаг, чтобы ваши микросервисы начали активно общаться между собой через события. А как всем известно: "Микросервис без событий — просто молчаливый кусок кода!"


1. Краткий обзор: зачем нам продюсеры и консьюмеры?

Apache Kafka — это система распределённой обработки событий. В ней есть:

  • Продюсеры (Producers) — приложения, которые отправляют сообщения в топики.
  • Консьюмеры (Consumers) — приложения, которые получают сообщения из топиков.

Практическое применение:

Продюсеры и консьюмеры позволяют микросервисам эффективно обрабатывать данные, передавая их друг другу в асинхронном режиме. Например, один микросервис может отправлять заказы в топик, а другой — обрабатывать их, читая из этого топика.


2. Создание Kafka-продюсера

2.1 Подготовка конфигурации Kafka в Spring Boot

Начнём с настройки нашего Spring Boot приложения. Добавьте зависимости Kafka в pom.xml:


<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.9.3</version> <!-- Укажите актуальную версию -->
</dependency>

Теперь создадим конфигурационный класс для Kafka. Этот класс скажет Spring, как подключаться к Kafka и управлять продюсерами.


import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    // Настройки для подключения к Kafka
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Адрес Kafka брокера
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    // KafkaTemplate используется для отправки сообщений
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

2.2 Реализация продюсера

Теперь напишем сервис для отправки сообщений в Kafka. Это будет наш продюсер.


import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // Метод для отправки сообщений
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
        System.out.println("Сообщение отправлено в топик: " + topic);
    }
}

2.3 Тестирование продюсера

Для тестирования создадим REST-контроллер, чтобы мы могли отправлять сообщения через HTTP-запросы. Это удобно, чтобы проверить, что всё работает правильно.


import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    private final KafkaProducerService kafkaProducerService;

    public KafkaController(KafkaProducerService kafkaProducerService) {
        this.kafkaProducerService = kafkaProducerService;
    }

    // Отправляем сообщение в Kafka с помощью HTTP GET запроса
    @GetMapping("/send")
    public String sendMessageToKafka(@RequestParam String topic, @RequestParam String message) {
        kafkaProducerService.sendMessage(topic, message);
        return "Сообщение отправлено в Kafka!";
    }
}

Запустите приложение и отправьте GET-запрос через браузер или Postman:

http://localhost:8080/send?topic=test_topic&message=Привет, Kafka!

Если вы видите в консоли сообщение Сообщение отправлено в топик: test_topic, значит продюсер готов!


3. Создание Kafka-консьюмера

3.1 Настройка консьюмера

Добавляем параметры для консьюмера в конфигурационный файл application.properties:


spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3.2 Реализация консьюмера

Теперь создадим сервис для обработки сообщений, поступающих из Kafka.


import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    // Метод будет вызываться автоматически при получении сообщения
    @KafkaListener(topics = "test_topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Получено сообщение: " + message);
    }
}

3.3 Проверяем работу консьюмера

Перезапустите приложение и отправьте сообщение через ваш продюсер (например, используя контроллер из прошлого шага). В консоли вы увидите:

Получено сообщение: Привет, Kafka!

Поздравляю, ваш консьюмер успешно обрабатывает сообщения!


4. Настройка сериализации для сложных объектов

Иногда вашим микросервисам нужно передавать не просто текстовые строки, а сложные объекты (например, JSON). Для этого потребуется настроить сериализацию.

4.1 Конфигурация сериализации

Добавьте зависимость для маппера Jackson в pom.xml:


<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

Теперь обновим конфигурацию Kafka, чтобы использовать JsonSerializer и JsonDeserializer.


import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

// Для продюсера
@Bean
public ProducerFactory<String, YourCustomObject> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

// Для консьюмера
@Bean
public ConsumerFactory<String, YourCustomObject> consumerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    return new DefaultKafkaConsumerFactory<>(configProps, new StringDeserializer(), new JsonDeserializer<>(YourCustomObject.class));
}

4.2 Пример работы с кастомными объектами

Создайте объект, который вы хотите передавать:


public class YourCustomObject {
    private String name;
    private int age;

    // Конструкторы, геттеры и сеттеры
}

Теперь можно отправлять и получать объекты через Kafka. Всё работает, как и раньше, но теперь с использованием вашей модели данных.


5. Что делать, если что-то пошло не так?

  • Сообщения не доходят до консьюмера: проверьте, совпадает ли group id в настройках консьюмера и topic в настройках продюсера.
  • Ошибка десериализации: если сериализация/десериализация не работает, убедитесь, что ваш объект имеет конструктор без параметров и правильно настроен Jackson.

Реальное применение

Теперь вы знаете, как ваши микросервисы могут взаимодействовать друг с другом через события Kafka. Этот подход используется в реальных проектах для:

  • Логирования действий пользователей.
  • Отправки уведомлений (например, email или пуш-уведомлений).
  • Синхронизации данных между серверами.

Kafka обеспечивает высокую производительность и надёжность, что делает её незаменимым инструментом для построения масштабируемых архитектур.

Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ