JavaRush /Курсы /Модуль 5. Spring /Лекция 193: Конфигурация Kafka в Spring Boot приложении

Лекция 193: Конфигурация Kafka в Spring Boot приложении

Модуль 5. Spring
20 уровень , 2 лекция
Открыта

В этой лекции мы углубимся в конфигурацию Kafka в Spring Boot приложении.

Основные настройки Kafka в application.properties

Итак, давайте начнем с простого. Как обычно, для настройки нашего Spring Boot приложения, нам потребуется указать основные параметры в файле конфигурации application.properties или application.yml. Эти параметры позволят приложению "понимать", где находится Kafka, и как с ней взаимодействовать.

Пример основных параметров:


# Адрес Kafka брокеров (можно указать несколько через запятую)
spring.kafka.bootstrap-servers=localhost:9092

# Настройка группы консьюмера
spring.kafka.consumer.group-id=my-group-id

# Авто-коммит смещения сообщений (да/нет?)
spring.kafka.consumer.enable-auto-commit=true

# Сериализация/десериализация ключей и значений
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

Давайте разберем, что здесь происходит:

  • spring.kafka.bootstrap-servers — указывает адреса всех доступных Kafka брокеров. Это стартовая точка, через которую клиент подключается к кластеру Kafka.
  • spring.kafka.consumer.group-id — задаёт идентификатор группы консьюмера. Все консьюмеры, использующие один и тот же group-id, будут совместно обрабатывать сообщения из одного топика.
  • spring.kafka.consumer.enable-auto-commit — позволяет автоматически подтверждать получение сообщений. Установим true для простоты на начальном этапе.
  • spring.kafka.consumer.key-deserializer и value-deserializer — указываем классы для десериализации ключей и сообщений, которые поступают из Kafka.
  • spring.kafka.producer.key-serializer и value-serializer — здесь мы аналогично указываем классы для сериализации ключей и сообщений, отправляемых в Kafka.

Java-конфигурация для Kafka

Хотя настройки в application.properties покрывают большинство базовых случаев, иногда требуется более тонкая настройка. Для этого мы можем воспользоваться Java-конфигурацией.

Давайте создадим конфигурационный класс для настроек продюсера (KafkaTemplate) и консьюмера (ConcurrentKafkaListenerContainerFactory).

8Конфигурационный класс KafkaConfig:*


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

    // Конфигурация для продюсера
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // Конфигурация для консьюмера
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Вот что здесь происходит:

  1. Мы создаем ProducerFactory и KafkaTemplate для отправки сообщений.
  2. Мы настраиваем ConsumerFactory и ConcurrentKafkaListenerContainerFactory для обработки сообщений с использованием аннотаций @KafkaListener.

Лучшие практики конфигурации

Когда вы работаете с Kafka, важно помнить, что от качества конфигурации зависят производительность и надежность вашего приложения. Вот несколько советов, которые могут вам пригодиться:

  1. Настраивайте значение consumer.poll.timeout: это определяет, как долго консьюмер может ждать новых сообщений из топика. Установите разумный тайм-аут, чтобы избежать задержки обработки.
  2. Используйте надёжную сериализацию и десериализацию: например, вы можете использовать JSON (вместе с библиотекой Jackson) или Avro для сложных сообщений. Это обеспечивает совместимость между продюсерами и консьюмерами.
  3. Обрабатывайте ошибки в продюсерах и консьюмерах: добавьте обработку исключений в точках отправки сообщений и прослушивания топиков. Это поможет избежать ситуации, когда обработка сообщений просто "зависает".

Пример конфигурации с несколькими брокерами

Если у вас кластер Kafka, то вам нужно указать адреса всех доступных брокеров. Spring Boot поддерживает это из коробки.

spring.kafka.bootstrap-servers=broker1:9092,broker2:9092,broker3:9092

Эта настройка позволит приложению подключаться к любому брокеру из указанных. Если один из брокеров недоступен, клиент автоматически переключится на другой.


Проверка конфигурации

После того как вы настроили параметры в application.properties или создали конфигурационный класс, важно проверить, правильно ли ваше приложение взаимодействует с Kafka. Вот несколько шагов для отладки:

  1. Запустите Kafka и убедитесь, что топики созданы.
  2. Используйте тестовый консольный продюсер и консьюмер Kafka:
    kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
    
  3. Проверьте логи приложения: В логах вашего приложения должны появиться сообщения о подключении к Kafka и обработке отправленных/полученных сообщений.

Теперь вы готовы к следующему шагу — созданию продюсера и консьюмера в Spring Boot приложении. Мы рассмотрим их отдельно в следующих лекциях, так что не пропустите!

Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ