В прошлой лекции мы создали базовый продюсер для отправки сообщений в Kafka. Сегодня мы расширим его функциональность, добавив обработку ошибок, работу с ключами и асинхронную отправку сообщений.
Работа с ключами сообщений
Ключи в Kafka помогают определить, в какую партицию попадёт сообщение.
Все сообщения с одинаковым ключом всегда попадают в одну и ту же партицию,
что гарантирует сохранение порядка для этих сообщений.
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "test-topic";
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessageWithKey(String key, String message) {
System.out.println("Отправка сообщения с ключом [" + key + "]: " + message);
kafkaTemplate.send(TOPIC, key, message);
}
}
Добавление метода в контроллер:
@RestController
@RequestMapping("/api/kafka")
public class KafkaController {
private final KafkaProducerService producerService;
@PostMapping("/send-with-key")
public String sendMessageWithKey(
@RequestParam String key,
@RequestParam String message) {
producerService.sendMessageWithKey(key, message);
return String.format("Сообщение отправлено с ключом %s: %s", key, message);
}
}
Обработка ошибок и логирование
Добавление Callback-функций
KafkaTemplate возвращает ListenableFuture, который позволяет отслеживать результат отправки:
public void sendMessageWithCallback(String key, String message) {
kafkaTemplate.send(TOPIC, key, message)
.addCallback(
result -> {
RecordMetadata metadata = result.getRecordMetadata();
System.out.printf(
"Сообщение успешно отправлено в топик %s, партиция [%d], оффсет %d%n",
metadata.topic(),
metadata.partition(),
metadata.offset()
);
},
ex -> System.err.println("Ошибка при отправке: " + ex.getMessage())
);
}
Асинхронная отправка с CompletableFuture
Для более гибкой работы с асинхронными результатами:
public CompletableFuture<SendResult<String, String>> sendMessageAsync(
String key,
String message) {
return kafkaTemplate.send(TOPIC, key, message)
.completable()
.whenComplete((result, ex) -> {
if (ex == null) {
RecordMetadata metadata = result.getRecordMetadata();
System.out.printf(
"Асинхронно отправлено в партицию %d с оффсетом %d%n",
metadata.partition(),
metadata.offset()
);
} else {
System.err.println("Ошибка асинхронной отправки: " + ex.getMessage());
}
});
}
Продвинутая конфигурация продюсера
Добавим расширенные настройки в application.properties:
# Базовые настройки
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# Дополнительные настройки
spring.kafka.producer.acks=all
spring.kafka.producer.retries=3
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.properties.retry.backoff.ms=1000
acks=all— ждём подтверждения от всех брокеров.retries=3— количество попыток повторной отправки.batch-size=16384— размер батча для группировки сообщений.buffer-memory=33554432— размер буфера для сообщений.retry.backoff.ms=1000— задержка между повторными попытками.
Отправка транзакционных сообщений
Для случаев, когда нужна строгая гарантия exactly-once:
@Transactional
public void sendMessagesInTransaction(List<String> messages) {
messages.forEach(msg -> kafkaTemplate.send(TOPIC, msg));
}
Kafka позволяет использовать транзакции для отправки сообщений,
что обеспечивает гарантию, что все сообщения либо будут доставлены, либо не будут отправлены вовсе.
Выводы
Сегодня мы рассмотрели:
- Как использовать ключи для управления маршрутизацией сообщений.
- Как отслеживать успешность отправки сообщений.
- Как работать с асинхронной отправкой сообщений.
- Как настраивать параметры Kafka-продюсера для повышения надёжности.
- Как работать с транзакциями в Kafka.
Теперь наш продюсер более надёжный и готов к использованию в реальных приложениях!
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ