JavaRush/Курсы/Модуль 5. Spring/Лекция 200: Настройка сериализации и десериализации данны...

Лекция 200: Настройка сериализации и десериализации данных в Kafka

Открыта

Представьте, что передача данных через Kafka — это пересылка посылок по почте. Для того чтобы отправить посылку, её нужно упаковать (сериализовать), а чтобы получить её обратно в исходном состоянии — необходимо распаковать (десериализовать). Kafka работает точно так же: она пересылает сообщения между продюсером и консьюмером, а эти сообщения должны быть упакованы в понятный Kafka формат.

По умолчанию Kafka может работать с простыми строками или байтовыми массивами. Но кто отправляет Java-объекты в виде байтового массива? Это как отправлять секретные посылки полной незнакомке! Чтобы было понятно, как работать с нашими объектами, нам нужны сериализаторы и десериализаторы, которые превращают данные в формат, понятный обеим сторонам.


Основные понятия сериализации и десериализации

Сериализация — это процесс преобразования объекта в последовательность байтов для его передачи или хранения.
Десериализация, как вы уже догадались, — это обратный процесс, который восстанавливает объект из последовательности байтов.

В контексте Kafka обработка данных с помощью сериализации и десериализации применяется в двух местах:

  1. Kafka-продюсер использует сериализацию для преобразования нашего объекта в байты перед отправкой его в топик.
  2. Kafka-консьюмер использует десериализацию для восстановления объекта из байтов после получения сообщения из топика.

Настройка сериализации и десериализации в Kafka

Для большинства приложений JSON остаётся самым популярным форматом передачи данных. Простота, гибкость и интеграция JSON с Java делают его идеальным кандидатом для работы с Kafka. Для настроек сериализации и десериализации JSON в Spring Kafka мы будем использовать библиотеку Jackson.

Jackson хорошо известен как мощный и быстрый инструмент для работы с JSON в Java. У него есть встроенная поддержка сериализации/десериализации объектов Java, и он легко интегрируется с Spring Boot. Поэтому наш выбор падает именно на него.


Настройка JSON сериализации и десериализации в Spring Kafka

Конфигурация продюсеров и консьюмеров

Для начала нам нужно создать класс конфигурации, который задаёт сериализаторы и десериализаторы для наших продюсеров и консьюмеров.

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConfig {

    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    // Конфигурация для продюсера
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // Конфигурация для консьюмера
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        JsonDeserializer<Object> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("*"); // Указаем, какие пакеты доверять
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer.getClass());
        return new DefaultKafkaConsumerFactory<>(configProps, new StringDeserializer(), deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
  1. ProducerFactory и KafkaTemplate — эти компоненты нужны для настройки продюсера. Мы указываем JsonSerializer.class как сериализатор значений, чтобы автоматически преобразовывать Java-объекты в JSON.
  2. ConsumerFactory и KafkaListenerContainerFactory — аналогичные компоненты для консьюмера. Указываем JsonDeserializer для преобразования JSON обратно в объекты Java.
  3. addTrustedPackages для десериализатора позволяет задать пакеты, чьи классы он будет десериализовать. Установка "*" доверяет всем пакетам, но в реальном приложении лучше явно указывать только нужные.

Практика: отправка и получение пользовательских объектов

Давайте создадим класс данных и отправим его через Kafka.

Создаём DTO

public class User {
    private String name;
    private int age;

    // Конструкторы, гетеры и сеттеры
    public User() {}

    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{name='" + name + "', age=" + age + '}';
    }
}

Set up продюсера

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private static final String TOPIC = "users";

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void sendUser(User user) {
        kafkaTemplate.send(TOPIC, user);
        System.out.println("Sent user: " + user);
    }
}

Set up консьюмера

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "users", groupId = "group_id")
    public void consumeUser(User user) {
        System.out.println("Received user: " + user);
    }
}

Запуск и тестирование

  1. Запустите Kafka (если ещё не запустили).
  2. Запустите ваше Spring Boot приложение.
  3. Создайте REST-эндпоинт или вызовите ваше приложение из main() метода, чтобы протестировать отправку и получение объектов.

Пример вызова:

public static void main(String[] args) {
    SpringApplication.run(Application.class, args);

    KafkaProducerService kafkaProducerService = new KafkaProducerService();
    kafkaProducerService.sendUser(new User("Alice", 30));
}

В консоли консьюмера вы должны увидеть что-то вроде:

Received user: User{name='Alice', age=30}

Заключение

Теперь вы знаете, как сериализовать и десериализовать объекты Java для работы с Kafka. М ы использовали JSON как стандартный и удобный формат, но вы можете использовать и другие форматы, такие как Avro или Protocol Buffers, если ваши задачи требуют продвинутых возможностей.

Комментарии
  • популярные
  • новые
  • старые
Для того, чтобы оставить комментарий Вы должны авторизоваться
У этой страницы еще нет ни одного комментария