Работа с сообщениями в Kafka подразумевает как отправку, так и получение данных. Если отправку сообщений мы можем осуществить с помощью KafkaTemplate, то для их обработки идеальным инструментом становится аннотация @KafkaListener. Spring Kafka делает процесс получения сообщений из топиков максимально простым и понятным, а главное — декларативным. Вам не нужно вручную писать сложный код для подключения или перехвата сообщений. Всё, что нужно — правильно настроить аннотацию.
Что касается аннотации @KafkaHandler, она берёт на себя задачу обработки различных типов сообщений в одном и том же listener-классе. Это особенно полезно, когда в одном топике могут приходить сообщения разных типов.
Аннотация @KafkaListener
Аннотация @KafkaListener используется для указания метода, который будет "слушать" определённый топик Kafka. Это самый удобный способ обработки сообщений.
Основные свойства @KafkaListener:
topics: Указывает, из каких топиков надо читать сообщения.groupId: Задаёт идентификатор группы консьюмеров. Благодаря группам, несколько приложений (или инстансов) могут читать сообщения из одного топика, не дублируя их.containerFactory: Указывает имя фабрики для настройки listener-контейнера. Используется для кастомных конфигураций.
Давайте рассмотрим пример:
@Component
public class KafkaMessageListener {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Получено сообщение из Kafka: " + message);
}
}
В этом небольшом фрагменте кода:
- Мы настроили метод
listenдля прослушивания топикаmy-topic. - Указали
groupId, чтобы listener мог быть частью определённой группы (важно при масштабировании приложения).
Пример обработки сообщений в Bean-компоненте
Очень часто удобно вынести обработку сообщений в отдельный компонент. Давайте расширим предыдущий пример:
@Component
public class MyMessageProcessor {
@KafkaListener(topics = "orders", groupId = "order-processors")
public void processOrder(String orderMessage) {
System.out.println("Обрабатываем заказ: " + orderMessage);
// Здесь может быть логика обработки заказа
}
}
Теперь метод processOrder будет вызван всякий раз, когда в топике orders появится новое сообщение.
Настройка нескольких топиков
Если вы хотите слушать сразу несколько топиков, это тоже легко сделать. Просто передайте их через запятую:
@KafkaListener(topics = {"topic1", "topic2"}, groupId = "multi-topic-listener")
public void listenMultipleTopics(String message) {
System.out.println("Сообщение из одного из топиков: " + message);
}
Аннотация @KafkaHandler
@KafkaHandler используется в сочетании с аннотацией @KafkaListener, чтобы обрабатывать различные типы сообщений внутри одного класса. Для этого целевой класс должен быть помечен аннотацией @KafkaListener, а методы обработки сообщений — @KafkaHandler.
Приведём пример использования. Представим ситуацию, где из одного топика приходят два типа сообщений — строки и объекты:
@Component
@KafkaListener(id = "multi-handler", topics = "multi-type-topic", groupId = "handlers-group")
public class MultiTypeMessageListener {
@KafkaHandler
public void handleStringMessage(String message) {
System.out.println("Обработано строковое сообщение: " + message);
}
@KafkaHandler
public void handleCustomObject(CustomObject object) {
System.out.println("Обработан объект: " + object);
}
}
Здесь:
- Метод
handleStringMessageобработает все строки. - Метод
handleCustomObjectобработает сообщения, которые могут быть десериализованы вCustomObject.
Для корректной работы KafkaHandler важно, чтобы сообщения приходили в нужном формате (например, JSON, Avro), и у вас были правильно настроены сериализаторы/десериализаторы. Мы подробнее коснёмся этого в следующих лекциях.
Ошибки и вызовы при использовании @KafkaHandler
- Если сообщение не соответствует одному из указанных типов, вы получите ошибку. Для таких случаев можно предусмотреть общий метод обработки или использовать подход с объединённым типом.
- Обязательно определите обработчик "по умолчанию". Пример:
@KafkaHandler(isDefault = true)
public void handleDefaultMessage(Object message) {
System.out.println("Неопознанное сообщение: " + message);
}
Обработка ошибок при получении сообщений
Когда что-то идёт не так (например, неправильный формат сообщения или ошибка десериализации), приложение может "споткнуться". Обратимся к теме обработки ошибок. Для этого в Spring Kafka есть несколько подходов, но сейчас мы рассмотрим базовый — с помощью @KafkaListener.
Пример обработки ошибок:
@KafkaListener(topics = "error-topic", groupId = "error-handler-group")
public void listenWithErrorHandling(String message) {
try {
System.out.println("Обрабатываем сообщение: " + message);
// Здесь ваша логика обработки сообщения
} catch (Exception e) {
System.err.println("Ошибка при обработке сообщения: " + e.getMessage());
}
}
Этот механизм позволяет минимизировать сбои в обработке, сохранив общую целостность приложения.
Практическое задание: настройка @KafkaListener и @KafkaHandler
- Создайте новый топик Kafka, например,
user-updates. - Напишите два
@KafkaListener-метода:- Один для обработки строковых сообщений, например, с именами пользователей.
- Другой для обработки JSON-объектов, представляющих данные пользователей.
- Реализуйте базовую обработку ошибок, чтобы выводить сообщение об ошибке вместо падения приложения.
Пример структуры кода:
@Component
public class UserUpdatesListener {
@KafkaListener(topics = "user-updates", groupId = "user-listeners")
public void handleStringMessages(String message) {
System.out.println("Обработка имени пользователя: " + message);
}
@KafkaHandler
public void handleUserObject(User user) {
System.out.println("Обработка объекта пользователя: " + user);
}
@KafkaHandler(isDefault = true)
public void handleUnknownMessage(Object message) {
System.out.println("Неизвестный тип сообщения: " + message);
}
}
Заключение темы
В этой лекции вы познакомились с двумя ключевыми инструментами обработки сообщений из Kafka в Spring Boot: аннотациями @KafkaListener и @KafkaHandler. Вы узнали, как слушать сообщения из топиков, обрабатывать их разными методами и настраивать группы консьюмеров. Практические примеры и задания помогут вам отточить эти навыки. В следующей лекции мы погрузимся в конфигурацию Kafka в Spring Boot, где разберём Java-классы настройки и, конечно, отточим мастерство работы с Kafka.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ