JavaRush /Курсы /Модуль 5. Spring /Лекция 195: Создание Kafka продюсера и отправка сообщений...

Лекция 195: Создание Kafka продюсера и отправка сообщений

Модуль 5. Spring
20 уровень , 4 лекция
Открыта

Прежде, чем углубиться в код, давайте разберёмся глубже в том, что делает продюсер.

Продюсер (Producer) — это сущность, которая отправляет сообщения в топик.
В мире Kafka продюсер — это тот, кто делится полезной информацией с брокером,
и эта информация потом разлетается по всем заинтересованным "подписчикам".
Представьте себе утреннюю новостную рассылку: вы пишете (производите)
электронные письма (сообщения) и отправляете (публикуете) их в группу новостников (топик).
Kafka сама разберётся, кто из подписчиков прочитает сообщение.

Особенности продюсера:

  1. Асинхронная отправка сообщений: продюсер отправляет сообщения быстро и не ждёт подтверждений.
  2. Ключи сообщений: сообщения могут иметь ключ для более точного распределения по партициям.
  3. Гарантии доставки: Kafka предоставляет гарантии доставки сообщений (At least once, At most once, Exactly once).

Настройка Spring Boot-проекта

Для работы с Kafka в Spring Boot нам понадобится добавить необходимые зависимости и настроить базовую конфигурацию.

Добавление зависимостей

В файл pom.xml добавим:


<dependencies>
    <!-- Spring for Apache Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

Базовая конфигурация

В файл application.properties добавим настройки подключения к Kafka:


# Указываем адрес Kafka-брокера
spring.kafka.bootstrap-servers=localhost:9092

# Указываем сериализаторы для ключей и значений сообщений
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

Создание простого продюсера

Создадим сервис для отправки сообщений в Kafka:


import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private static final String TOPIC = "test-topic";
    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        System.out.println("Отправка сообщения: " + message);
        kafkaTemplate.send(TOPIC, message);
    }
}
  • KafkaTemplate — основной инструмент для отправки сообщений в Kafka.
  • TOPIC — имя топика, в который будем отправлять сообщения.
  • sendMessage() — метод, который отправляет сообщение в Kafka.

Создание REST-контроллера для тестирования

Для удобства тестирования создадим REST-контроллер:


import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    private final KafkaProducerService producerService;

    public KafkaController(KafkaProducerService producerService) {
        this.producerService = producerService;
    }

    @PostMapping("/api/messages")
    public String sendMessage(@RequestParam String message) {
        producerService.sendMessage(message);
        return "Сообщение отправлено: " + message;
    }
}
  • KafkaController — REST-контроллер, который принимает HTTP-запросы.
  • sendMessage() — обрабатывает POST-запрос на отправку сообщения в Kafka.
  • @RequestParam — позволяет передавать сообщение в запросе.

Тестирование продюсера

1. Создание топика

Перед тестированием убедитесь, что топик создан:


    bin/kafka-topics.sh --create \
    --topic test-topic \
    --bootstrap-server localhost:9092 \
    --partitions 1 \
    --replication-factor 1

2. Отправка тестового сообщения

Используйте curl для отправки сообщения:


curl -X POST "http://localhost:8080/api/messages?message=Привет, Kafka!"

3. Проверка сообщений в Kafka

Запустите консольный потребитель и убедитесь, что сообщение дошло:


bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

Заключение

Теперь у нас есть полностью рабочий Kafka-продюсер на Spring Boot,
который может отправлять сообщения в топик и обрабатывать их через REST API.

В следующей лекции мы разберём, как создать и настроить Kafka-консьюмера,
чтобы он мог получать и обрабатывать сообщения из топика.

Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ