JavaRush /Курсы /Модуль 5. Spring /Лекция 194: Практика: настройка Spring Boot для интеграци...

Лекция 194: Практика: настройка Spring Boot для интеграции с Kafka

Модуль 5. Spring
20 уровень , 3 лекция
Открыта

Наша задача — создать Spring Boot приложение, которое сможет:

  1. Отправлять сообщения в Kafka-топик (продюсер).
  2. Получать сообщения из Kafka-топика (консьюмер).
  3. Использовать конфигурации для работы с Kafka.

Давайте приступим к созданию Kafka-enabled приложения и добавим в него немного магии асинхронных событий.


1. Подготовка проекта

Прежде, чем начать работу с Kafka в Spring Boot, нам нужно настроить проект. На это уйдет примерно 5 минут. Как раз хватит времени на чашку хорошего кофе.

Способ 1: Spring Initializr (рекомендуемый)

  1. Перейдите на Spring Initializr
  2. Укажите параметры проекта:
    • Project: Maven
    • Language: Java
    • Spring Boot: 3.x.x (последняя стабильная версия)
    • Project Metadata:
      • Group: com.example
      • Artifact: kafka-demo
      • Name: kafka-demo
      • Description: Demo project for Spring Boot with Kafka
      • Package name: com.example.kafkademo
    • Packaging: Jar
    • Java: 17
  3. Добавьте зависимости:
    • Spring for Apache Kafka
    • Spring Web
    • Lombok
    • Spring Boot DevTools (опционально, для удобства разработки)

Способ 2: Ручная настройка

Если вы предпочитаете настраивать проект вручную, добавьте следующие зависимости в существующий pom.xml:


<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>

После этого обновите Maven (mvn clean install) и убедитесь, что зависимости подтянулись.


2. Настройка Kafka в application.properties

Первым делом убедимся, что наше приложение знает, где искать Kafka, и как с ним общаться. Для этого добавим настройки в файл src/main/resources/application.properties:


# Адрес брокеров Kafka
spring.kafka.bootstrap-servers=localhost:9092

# Сердцебиение между брокером и консьюмером
spring.kafka.consumer.group-id=my-group

# Автоматическая сериализация (для JSON)
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

Здесь:

  • spring.kafka.bootstrap-servers — указывает адрес брокера Kafka (в нашем случае, локальный).
  • Остальные настройки определяют, как сообщения будут сериализоваться и десериализоваться.

3. Создание Kafka-продюсера

Продюсер — это отправитель сообщений. Он берёт сообщение и отправляет его в Kafka-топик. Представьте его как почтальона, который доставляет письма по нужному адресу.

KafkaTemplate

Spring Kafka предоставляет удобный класс KafkaTemplate для отправки сообщений. Давайте создадим слой продюсера.

Реализация

Создайте класс KafkaProducerService в пакете com.example.kafka:


package com.example.kafka;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("my_topic", message); // Отправляем сообщение в топик "my_topic"
        System.out.println("Сообщение отправлено в Kafka: " + message);
    }
}
  • KafkaTemplate<String, String> — это наш почтальон для отправки сообщений.
  • Метод sendMessage отправляет текстовое сообщение в топик my_topic.

4. Создание Kafka-консьюмера

Консьюмер — это получатель сообщений. Это тот, кто читает входящие сообщения из Kafka-топиков, как постоянный подписчик новостей.

Аннотация @KafkaListener

Spring Kafka предоставляет аннотацию @KafkaListener, чтобы автоматически обрабатывать входящие сообщения. Настроим простого консьюмера.

Реализация

Создайте класс KafkaConsumerService в пакете com.example.kafka:


package com.example.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my_topic", groupId = "my-group")
    public void consumeMessage(String message) {
        System.out.println("Получено сообщение из Kafka: " + message);
    }
}
  • Аннотация @KafkaListener(topics = "my_topic", groupId = "my-group") говорит Spring, что этот метод нужно вызывать каждый раз, когда поступает сообщение в топик my_topic.
  • Метод consumeMessage принимает сообщения и логирует их.

5. Создание REST-контроллера для взаимодействия

Чтобы легче тестировать наше приложение, создадим REST API для отправки сообщений в Kafka. REST + Kafka = 🔥!

Реализация

Создайте класс KafkaController в пакете com.example.kafka:


package com.example.kafka;

import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/kafka")
@RequiredArgsConstructor
public class KafkaController {

    private final KafkaProducerService kafkaProducerService;

    @PostMapping("/send")
    public String sendMessage(@RequestParam("message") String message) {
        kafkaProducerService.sendMessage(message);
        return "Сообщение отправлено: " + message;
    }
}
  • @RequestMapping("/api/kafka") задаёт URL для всех методов этого контроллера.
  • @PostMapping("/send") — ожидает POST-запрос с параметром message, который отправляется в Kafka.

6. Запуск и тестирование

Наше приложение готово! Давайте его запустим и протестируем.

Запуск

Предполагается, что у вас уже запущен Kafka-брокер. Если вы ещё не запустили Kafka, сделайте это (инструкции по установке и настройке Kafka есть в лекции 184). Теперь запустите Spring Boot приложение из вашего IDE или командой:

mvn spring-boot:run

Тестирование через Postman или cURL

1. Отправим сообщение в Kafka:


curl -X POST "http://localhost:8080/api/kafka/send?message=Привет, Kafka!"

Вы должны увидеть ответ:


Сообщение отправлено: Привет, Kafka!

2. Проверьте консоль приложения, чтобы убедиться, что сообщение успешно обработано консьюмером:

Получено сообщение из Kafka: Привет, Kafka!

Тестирование через Kafka CLI

Для более глубокого тестирования можно использовать Kafka CLI:

  • Отправка сообщения вручную:
    
    kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
    
  • Просмотр сообщений:
    
    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning
    

Ошибки и их устранение

Во время работы могут возникнуть следующие ошибки:

  1. Ошибка подключения к Kafka: проверьте, чтобы Kafka была запущена (команда kafka-server-start.sh).
  2. Отсутствует топик: убедитесь, что топик my_topic создан (kafka-topics.sh --create).
  3. Ошибка десериализации: проверьте настройки сериализации/десериализации в application.properties.

Теперь у вас есть рабочее приложение Spring Boot, интегрированное с Kafka. Этот базовый шаблон легко расширяется для более сложных случаев, таких как отправка объектов, обработка больших потоков данных или реализация асинхронной логики. Добро пожаловать в мир событийно-ориентированной архитектуры!

Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ