Прежде, чем углубиться в код, давайте разберёмся глубже в том, что делает продюсер.
Продюсер (Producer) — это сущность, которая отправляет сообщения в топик.
В мире Kafka продюсер — это тот, кто делится полезной информацией с брокером,
и эта информация потом разлетается по всем заинтересованным "подписчикам".
Представьте себе утреннюю новостную рассылку: вы пишете (производите)
электронные письма (сообщения) и отправляете (публикуете) их в группу новостников (топик).
Kafka сама разберётся, кто из подписчиков прочитает сообщение.
Особенности продюсера:
- Асинхронная отправка сообщений: продюсер отправляет сообщения быстро и не ждёт подтверждений.
- Ключи сообщений: сообщения могут иметь ключ для более точного распределения по партициям.
- Гарантии доставки: Kafka предоставляет гарантии доставки сообщений (
At least once,At most once,Exactly once).
Настройка Spring Boot-проекта
Для работы с Kafka в Spring Boot нам понадобится добавить необходимые зависимости и настроить базовую конфигурацию.
Добавление зависимостей
В файл pom.xml добавим:
<dependencies>
<!-- Spring for Apache Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
Базовая конфигурация
В файл application.properties добавим настройки подключения к Kafka:
# Указываем адрес Kafka-брокера
spring.kafka.bootstrap-servers=localhost:9092
# Указываем сериализаторы для ключей и значений сообщений
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
Создание простого продюсера
Создадим сервис для отправки сообщений в Kafka:
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private static final String TOPIC = "test-topic";
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
System.out.println("Отправка сообщения: " + message);
kafkaTemplate.send(TOPIC, message);
}
}
- KafkaTemplate — основной инструмент для отправки сообщений в Kafka.
- TOPIC — имя топика, в который будем отправлять сообщения.
- sendMessage() — метод, который отправляет сообщение в Kafka.
Создание REST-контроллера для тестирования
Для удобства тестирования создадим REST-контроллер:
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
private final KafkaProducerService producerService;
public KafkaController(KafkaProducerService producerService) {
this.producerService = producerService;
}
@PostMapping("/api/messages")
public String sendMessage(@RequestParam String message) {
producerService.sendMessage(message);
return "Сообщение отправлено: " + message;
}
}
- KafkaController — REST-контроллер, который принимает HTTP-запросы.
- sendMessage() — обрабатывает POST-запрос на отправку сообщения в Kafka.
- @RequestParam — позволяет передавать сообщение в запросе.
Тестирование продюсера
1. Создание топика
Перед тестированием убедитесь, что топик создан:
bin/kafka-topics.sh --create \
--topic test-topic \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
2. Отправка тестового сообщения
Используйте curl для отправки сообщения:
curl -X POST "http://localhost:8080/api/messages?message=Привет, Kafka!"
3. Проверка сообщений в Kafka
Запустите консольный потребитель и убедитесь, что сообщение дошло:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
Заключение
Теперь у нас есть полностью рабочий Kafka-продюсер на Spring Boot,
который может отправлять сообщения в топик и обрабатывать их через REST API.
В следующей лекции мы разберём, как создать и настроить Kafka-консьюмера,
чтобы он мог получать и обрабатывать сообщения из топика.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ