JavaRush /Курсы /Модуль 5. Spring /Лекция 192: Основы Spring Kafka: аннотации @KafkaListener...

Лекция 192: Основы Spring Kafka: аннотации @KafkaListener, @KafkaHandler

Модуль 5. Spring
20 уровень , 1 лекция
Открыта

Работа с сообщениями в Kafka подразумевает как отправку, так и получение данных. Если отправку сообщений мы можем осуществить с помощью KafkaTemplate, то для их обработки идеальным инструментом становится аннотация @KafkaListener. Spring Kafka делает процесс получения сообщений из топиков максимально простым и понятным, а главное — декларативным. Вам не нужно вручную писать сложный код для подключения или перехвата сообщений. Всё, что нужно — правильно настроить аннотацию.

Что касается аннотации @KafkaHandler, она берёт на себя задачу обработки различных типов сообщений в одном и том же listener-классе. Это особенно полезно, когда в одном топике могут приходить сообщения разных типов.


Аннотация @KafkaListener

Аннотация @KafkaListener используется для указания метода, который будет "слушать" определённый топик Kafka. Это самый удобный способ обработки сообщений.

Основные свойства @KafkaListener:

  • topics: Указывает, из каких топиков надо читать сообщения.
  • groupId: Задаёт идентификатор группы консьюмеров. Благодаря группам, несколько приложений (или инстансов) могут читать сообщения из одного топика, не дублируя их.
  • containerFactory: Указывает имя фабрики для настройки listener-контейнера. Используется для кастомных конфигураций.

Давайте рассмотрим пример:


@Component
public class KafkaMessageListener {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Получено сообщение из Kafka: " + message);
    }
}

В этом небольшом фрагменте кода:

  1. Мы настроили метод listen для прослушивания топика my-topic.
  2. Указали groupId, чтобы listener мог быть частью определённой группы (важно при масштабировании приложения).

Пример обработки сообщений в Bean-компоненте

Очень часто удобно вынести обработку сообщений в отдельный компонент. Давайте расширим предыдущий пример:


@Component
public class MyMessageProcessor {

    @KafkaListener(topics = "orders", groupId = "order-processors")
    public void processOrder(String orderMessage) {
        System.out.println("Обрабатываем заказ: " + orderMessage);
        // Здесь может быть логика обработки заказа
    }
}

Теперь метод processOrder будет вызван всякий раз, когда в топике orders появится новое сообщение.


Настройка нескольких топиков

Если вы хотите слушать сразу несколько топиков, это тоже легко сделать. Просто передайте их через запятую:


@KafkaListener(topics = {"topic1", "topic2"}, groupId = "multi-topic-listener")
public void listenMultipleTopics(String message) {
    System.out.println("Сообщение из одного из топиков: " + message);
}

Аннотация @KafkaHandler

@KafkaHandler используется в сочетании с аннотацией @KafkaListener, чтобы обрабатывать различные типы сообщений внутри одного класса. Для этого целевой класс должен быть помечен аннотацией @KafkaListener, а методы обработки сообщений — @KafkaHandler.

Приведём пример использования. Представим ситуацию, где из одного топика приходят два типа сообщений — строки и объекты:


@Component
@KafkaListener(id = "multi-handler", topics = "multi-type-topic", groupId = "handlers-group")
public class MultiTypeMessageListener {

    @KafkaHandler
    public void handleStringMessage(String message) {
        System.out.println("Обработано строковое сообщение: " + message);
    }

    @KafkaHandler
    public void handleCustomObject(CustomObject object) {
        System.out.println("Обработан объект: " + object);
    }
}

Здесь:

  1. Метод handleStringMessage обработает все строки.
  2. Метод handleCustomObject обработает сообщения, которые могут быть десериализованы в CustomObject.

Для корректной работы KafkaHandler важно, чтобы сообщения приходили в нужном формате (например, JSON, Avro), и у вас были правильно настроены сериализаторы/десериализаторы. Мы подробнее коснёмся этого в следующих лекциях.

Ошибки и вызовы при использовании @KafkaHandler

  • Если сообщение не соответствует одному из указанных типов, вы получите ошибку. Для таких случаев можно предусмотреть общий метод обработки или использовать подход с объединённым типом.
  • Обязательно определите обработчик "по умолчанию". Пример:

@KafkaHandler(isDefault = true)
public void handleDefaultMessage(Object message) {
    System.out.println("Неопознанное сообщение: " + message);
}

Обработка ошибок при получении сообщений

Когда что-то идёт не так (например, неправильный формат сообщения или ошибка десериализации), приложение может "споткнуться". Обратимся к теме обработки ошибок. Для этого в Spring Kafka есть несколько подходов, но сейчас мы рассмотрим базовый — с помощью @KafkaListener.

Пример обработки ошибок:


@KafkaListener(topics = "error-topic", groupId = "error-handler-group")
public void listenWithErrorHandling(String message) {
    try {
        System.out.println("Обрабатываем сообщение: " + message);
        // Здесь ваша логика обработки сообщения
    } catch (Exception e) {
        System.err.println("Ошибка при обработке сообщения: " + e.getMessage());
    }
}

Этот механизм позволяет минимизировать сбои в обработке, сохранив общую целостность приложения.


Практическое задание: настройка @KafkaListener и @KafkaHandler

  1. Создайте новый топик Kafka, например, user-updates.
  2. Напишите два @KafkaListener-метода:
    • Один для обработки строковых сообщений, например, с именами пользователей.
    • Другой для обработки JSON-объектов, представляющих данные пользователей.
  3. Реализуйте базовую обработку ошибок, чтобы выводить сообщение об ошибке вместо падения приложения.

Пример структуры кода:


@Component
public class UserUpdatesListener {

    @KafkaListener(topics = "user-updates", groupId = "user-listeners")
    public void handleStringMessages(String message) {
        System.out.println("Обработка имени пользователя: " + message);
    }

    @KafkaHandler
    public void handleUserObject(User user) {
        System.out.println("Обработка объекта пользователя: " + user);
    }

    @KafkaHandler(isDefault = true)
    public void handleUnknownMessage(Object message) {
        System.out.println("Неизвестный тип сообщения: " + message);
    }
}

Заключение темы

В этой лекции вы познакомились с двумя ключевыми инструментами обработки сообщений из Kafka в Spring Boot: аннотациями @KafkaListener и @KafkaHandler. Вы узнали, как слушать сообщения из топиков, обрабатывать их разными методами и настраивать группы консьюмеров. Практические примеры и задания помогут вам отточить эти навыки. В следующей лекции мы погрузимся в конфигурацию Kafka в Spring Boot, где разберём Java-классы настройки и, конечно, отточим мастерство работы с Kafka.

Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ