Наша задача — создать Spring Boot приложение, которое сможет:
- Отправлять сообщения в Kafka-топик (продюсер).
- Получать сообщения из Kafka-топика (консьюмер).
- Использовать конфигурации для работы с Kafka.
Давайте приступим к созданию Kafka-enabled приложения и добавим в него немного магии асинхронных событий.
1. Подготовка проекта
Прежде, чем начать работу с Kafka в Spring Boot, нам нужно настроить проект. На это уйдет примерно 5 минут. Как раз хватит времени на чашку хорошего кофе.
Способ 1: Spring Initializr (рекомендуемый)
- Перейдите на Spring Initializr
- Укажите параметры проекта:
- Project: Maven
- Language: Java
- Spring Boot: 3.x.x (последняя стабильная версия)
- Project Metadata:
- Group: com.example
- Artifact: kafka-demo
- Name: kafka-demo
- Description: Demo project for Spring Boot with Kafka
- Package name: com.example.kafkademo
- Packaging: Jar
- Java: 17
- Добавьте зависимости:
- Spring for Apache Kafka
- Spring Web
- Lombok
- Spring Boot DevTools (опционально, для удобства разработки)
Способ 2: Ручная настройка
Если вы предпочитаете настраивать проект вручную, добавьте следующие зависимости в существующий pom.xml:
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
После этого обновите Maven (mvn clean install) и убедитесь, что зависимости подтянулись.
2. Настройка Kafka в application.properties
Первым делом убедимся, что наше приложение знает, где искать Kafka, и как с ним общаться. Для этого добавим настройки в файл src/main/resources/application.properties:
# Адрес брокеров Kafka
spring.kafka.bootstrap-servers=localhost:9092
# Сердцебиение между брокером и консьюмером
spring.kafka.consumer.group-id=my-group
# Автоматическая сериализация (для JSON)
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
Здесь:
spring.kafka.bootstrap-servers— указывает адрес брокера Kafka (в нашем случае, локальный).- Остальные настройки определяют, как сообщения будут сериализоваться и десериализоваться.
3. Создание Kafka-продюсера
Продюсер — это отправитель сообщений. Он берёт сообщение и отправляет его в Kafka-топик. Представьте его как почтальона, который доставляет письма по нужному адресу.
KafkaTemplate
Spring Kafka предоставляет удобный класс KafkaTemplate для отправки сообщений. Давайте создадим слой продюсера.
Реализация
Создайте класс KafkaProducerService в пакете com.example.kafka:
package com.example.kafka;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("my_topic", message); // Отправляем сообщение в топик "my_topic"
System.out.println("Сообщение отправлено в Kafka: " + message);
}
}
KafkaTemplate<String, String>— это наш почтальон для отправки сообщений.- Метод
sendMessageотправляет текстовое сообщение в топикmy_topic.
4. Создание Kafka-консьюмера
Консьюмер — это получатель сообщений. Это тот, кто читает входящие сообщения из Kafka-топиков, как постоянный подписчик новостей.
Аннотация @KafkaListener
Spring Kafka предоставляет аннотацию @KafkaListener, чтобы автоматически обрабатывать входящие сообщения. Настроим простого консьюмера.
Реализация
Создайте класс KafkaConsumerService в пакете com.example.kafka:
package com.example.kafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my_topic", groupId = "my-group")
public void consumeMessage(String message) {
System.out.println("Получено сообщение из Kafka: " + message);
}
}
- Аннотация
@KafkaListener(topics = "my_topic", groupId = "my-group")говорит Spring, что этот метод нужно вызывать каждый раз, когда поступает сообщение в топикmy_topic. - Метод
consumeMessageпринимает сообщения и логирует их.
5. Создание REST-контроллера для взаимодействия
Чтобы легче тестировать наше приложение, создадим REST API для отправки сообщений в Kafka. REST + Kafka = 🔥!
Реализация
Создайте класс KafkaController в пакете com.example.kafka:
package com.example.kafka;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/kafka")
@RequiredArgsConstructor
public class KafkaController {
private final KafkaProducerService kafkaProducerService;
@PostMapping("/send")
public String sendMessage(@RequestParam("message") String message) {
kafkaProducerService.sendMessage(message);
return "Сообщение отправлено: " + message;
}
}
@RequestMapping("/api/kafka")задаёт URL для всех методов этого контроллера.@PostMapping("/send")— ожидает POST-запрос с параметромmessage, который отправляется в Kafka.
6. Запуск и тестирование
Наше приложение готово! Давайте его запустим и протестируем.
Запуск
Предполагается, что у вас уже запущен Kafka-брокер. Если вы ещё не запустили Kafka, сделайте это (инструкции по установке и настройке Kafka есть в лекции 184). Теперь запустите Spring Boot приложение из вашего IDE или командой:
mvn spring-boot:run
Тестирование через Postman или cURL
1. Отправим сообщение в Kafka:
curl -X POST "http://localhost:8080/api/kafka/send?message=Привет, Kafka!"
Вы должны увидеть ответ:
Сообщение отправлено: Привет, Kafka!
2. Проверьте консоль приложения, чтобы убедиться, что сообщение успешно обработано консьюмером:
Получено сообщение из Kafka: Привет, Kafka!
Тестирование через Kafka CLI
Для более глубокого тестирования можно использовать Kafka CLI:
- Отправка сообщения вручную:
kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic - Просмотр сообщений:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning
Ошибки и их устранение
Во время работы могут возникнуть следующие ошибки:
- Ошибка подключения к Kafka: проверьте, чтобы Kafka была запущена (команда
kafka-server-start.sh). - Отсутствует топик: убедитесь, что топик
my_topicсоздан (kafka-topics.sh --create). - Ошибка десериализации: проверьте настройки сериализации/десериализации в
application.properties.
Теперь у вас есть рабочее приложение Spring Boot, интегрированное с Kafka. Этот базовый шаблон легко расширяется для более сложных случаев, таких как отправка объектов, обработка больших потоков данных или реализация асинхронной логики. Добро пожаловать в мир событийно-ориентированной архитектуры!
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ