JavaRush /Курсы /Модуль 5. Spring /Лекция 196: Практика: создание простого продюсера для отп...

Лекция 196: Практика: создание простого продюсера для отправки сообщений в топик

Модуль 5. Spring
20 уровень , 5 лекция
Открыта

В прошлой лекции мы создали базовый продюсер для отправки сообщений в Kafka. Сегодня мы расширим его функциональность, добавив обработку ошибок, работу с ключами и асинхронную отправку сообщений.


Работа с ключами сообщений

Ключи в Kafka помогают определить, в какую партицию попадёт сообщение.
Все сообщения с одинаковым ключом всегда попадают в одну и ту же партицию,
что гарантирует сохранение порядка для этих сообщений.


@Service
public class KafkaProducerService {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private static final String TOPIC = "test-topic";

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessageWithKey(String key, String message) {
        System.out.println("Отправка сообщения с ключом [" + key + "]: " + message);
        kafkaTemplate.send(TOPIC, key, message);
    }
}

Добавление метода в контроллер:


@RestController
@RequestMapping("/api/kafka")
public class KafkaController {
    private final KafkaProducerService producerService;

    @PostMapping("/send-with-key")
    public String sendMessageWithKey(
            @RequestParam String key,
            @RequestParam String message) {
        producerService.sendMessageWithKey(key, message);
        return String.format("Сообщение отправлено с ключом %s: %s", key, message);
    }
}

Обработка ошибок и логирование

Добавление Callback-функций

KafkaTemplate возвращает ListenableFuture, который позволяет отслеживать результат отправки:


    public void sendMessageWithCallback(String key, String message) {
    kafkaTemplate.send(TOPIC, key, message)
        .addCallback(
            result -> {
                RecordMetadata metadata = result.getRecordMetadata();
                System.out.printf(
                    "Сообщение успешно отправлено в топик %s, партиция [%d], оффсет %d%n",
                    metadata.topic(),
                    metadata.partition(),
                    metadata.offset()
                );
            },
            ex -> System.err.println("Ошибка при отправке: " + ex.getMessage())
        );
}

Асинхронная отправка с CompletableFuture

Для более гибкой работы с асинхронными результатами:


    public CompletableFuture<SendResult<String, String>> sendMessageAsync(
        String key, 
        String message) {
    return kafkaTemplate.send(TOPIC, key, message)
        .completable()
        .whenComplete((result, ex) -> {
            if (ex == null) {
                RecordMetadata metadata = result.getRecordMetadata();
                System.out.printf(
                    "Асинхронно отправлено в партицию %d с оффсетом %d%n",
                    metadata.partition(),
                    metadata.offset()
                );
            } else {
                System.err.println("Ошибка асинхронной отправки: " + ex.getMessage());
            }
        });
}

Продвинутая конфигурация продюсера

Добавим расширенные настройки в application.properties:


# Базовые настройки
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# Дополнительные настройки
spring.kafka.producer.acks=all
spring.kafka.producer.retries=3
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.properties.retry.backoff.ms=1000
  • acks=all — ждём подтверждения от всех брокеров.
  • retries=3 — количество попыток повторной отправки.
  • batch-size=16384 — размер батча для группировки сообщений.
  • buffer-memory=33554432 — размер буфера для сообщений.
  • retry.backoff.ms=1000 — задержка между повторными попытками.

Отправка транзакционных сообщений

Для случаев, когда нужна строгая гарантия exactly-once:


@Transactional
public void sendMessagesInTransaction(List<String> messages) {
    messages.forEach(msg -> kafkaTemplate.send(TOPIC, msg));
}

Kafka позволяет использовать транзакции для отправки сообщений,
что обеспечивает гарантию, что все сообщения либо будут доставлены, либо не будут отправлены вовсе.


Выводы

Сегодня мы рассмотрели:

  • Как использовать ключи для управления маршрутизацией сообщений.
  • Как отслеживать успешность отправки сообщений.
  • Как работать с асинхронной отправкой сообщений.
  • Как настраивать параметры Kafka-продюсера для повышения надёжности.
  • Как работать с транзакциями в Kafka.

Теперь наш продюсер более надёжный и готов к использованию в реальных приложениях!

Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ