JavaRush /Курсы /Модуль 5. Spring /Лекция 198: Практика: создание консьюмера для получения с...

Лекция 198: Практика: создание консьюмера для получения сообщений из топика

Модуль 5. Spring
20 уровень , 7 лекция
Открыта

В прошлой лекции мы создали базовый консьюмер, который умеет получать и обрабатывать простые текстовые сообщения. Сегодня мы научимся работать со сложными объектами, настроим продвинутое логирование и рассмотрим реальные сценарии использования Kafka Consumer.

Работа с бизнес-объектами

В реальных приложениях мы обычно работаем не с простыми строками, а с бизнес-объектами. Давайте создадим типичный пример — обработку заказов.

Модель данных


import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

@Data
public class Order {
   private String id;
   private String customerName;
   private String product;
   private int quantity;
   private OrderStatus status;

   public enum OrderStatus {
       NEW, PROCESSING, COMPLETED, CANCELLED
   }
}

Конфигурация для работы с JSON


@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Order> orderConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            new JsonDeserializer<>(Order.class)
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> orderKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(orderConsumerFactory());
        return factory;
    }
}

Продвинутая обработка сообщений

Сервис обработки заказов


@Service
@Slf4j
public class OrderConsumer {

    private final OrderService orderService;

    public OrderConsumer(OrderService orderService) {
        this.orderService = orderService;
    }

    @KafkaListener(
        topics = "orders",
        groupId = "order-processing-group",
        containerFactory = "orderKafkaListenerContainerFactory"
    )
    public void processOrder(Order order) {
        log.info("Получен новый заказ: {}", order);

        try {
            order.setStatus(OrderStatus.PROCESSING);
            orderService.processOrder(order);
            log.info("Заказ {} успешно обработан", order.getId());
        } catch (Exception e) {
            log.error("Ошибка при обработке заказа {}: {}", order.getId(), e.getMessage());
            order.setStatus(OrderStatus.CANCELLED);
            // Здесь может быть логика отправки уведомления об ошибке
        }
    }
}

Обработка ошибок и повторные попытки

Настройка политики повторных попыток


@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> orderKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(orderConsumerFactory());

        // Настройка повторных попыток
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new FixedBackOff(5000L, 3) // 3 попытки с интервалом 5 секунд
        ));

        return factory;
    }
}

Обработка специфических ошибок


@KafkaListener(topics = "orders", groupId = "order-processing-group")
public void processOrder(Order order) {
    try {
        validateOrder(order);
        processValidOrder(order);
    } catch (InvalidOrderException e) {
        log.error("Получен некорректный заказ: {}", e.getMessage());
        // Некорректные заказы не обрабатываем повторно
        sendToDeadLetterQueue(order);
    } catch (TemporaryProcessingException e) {
        log.warn("Временная ошибка обработки: {}", e.getMessage());
        // Временные ошибки будут обработаны повторно
        throw e;
    }
}

Мониторинг и логирование

Добавление метрик


@Service
@Slf4j
public class OrderConsumer {
    private final Counter processedOrders =
        Counter.builder("kafka.orders.processed")
            .description("Количество обработанных заказов")
            .register(Metrics.globalRegistry);

    private final Timer processingTime =
        Timer.builder("kafka.orders.processing_time")
            .description("Время обработки заказа")
            .register(Metrics.globalRegistry);

    @KafkaListener(topics = "orders", groupId = "order-processing-group")
    public void processOrder(Order order) {
        processingTime.record(() -> {
            try {
                processOrderInternal(order);
                processedOrders.increment();
            } catch (Exception e) {
                log.error("Ошибка обработки", e);
            }
        });
    }
}

Практические сценарии использования

Сценарий 1: асинхронная обработка заказов


@Service
public class OrderProcessor {

    @KafkaListener(topics = "new-orders")
    public void processNewOrder(Order order) {
        // 1. Валидация заказа
        validateOrder(order);

        // 2. Проверка наличия товара
        checkInventory(order);

        // 3. Резервирование товара
        reserveItems(order);

        // 4. Отправка подтверждения
        sendConfirmation(order);
    }
}

Сценарий 2: распределённая обработка событий


@Service
public class DistributedEventProcessor {

    @KafkaListener(topics = "events", concurrency = "3")
    public void processEvent(Event event) {
        switch (event.getType()) {
            case USER_REGISTRATION:
                handleUserRegistration(event);
                break;
            case PAYMENT_RECEIVED:
                handlePayment(event);
                break;
            case ORDER_STATUS_CHANGED:
                handleOrderStatusChange(event);
                break;
        }
    }
}

Заключение

Теперь у нас есть полноценный консьюмер, готовый к работе в production-окружении. Он умеет:

  • Обрабатывать сложные бизнес-объекты
  • Корректно обрабатывать ошибки
  • Предоставлять метрики для мониторинга
  • Масштабироваться под нагрузкой
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ