В прошлой лекции мы создали базовый консьюмер, который умеет получать и обрабатывать простые текстовые сообщения. Сегодня мы научимся работать со сложными объектами, настроим продвинутое логирование и рассмотрим реальные сценарии использования Kafka Consumer.
Работа с бизнес-объектами
В реальных приложениях мы обычно работаем не с простыми строками, а с бизнес-объектами. Давайте создадим типичный пример — обработку заказов.
Модель данных
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class Order {
private String id;
private String customerName;
private String product;
private int quantity;
private OrderStatus status;
public enum OrderStatus {
NEW, PROCESSING, COMPLETED, CANCELLED
}
}
Конфигурация для работы с JSON
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Order> orderConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Order.class)
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> orderKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(orderConsumerFactory());
return factory;
}
}
Продвинутая обработка сообщений
Сервис обработки заказов
@Service
@Slf4j
public class OrderConsumer {
private final OrderService orderService;
public OrderConsumer(OrderService orderService) {
this.orderService = orderService;
}
@KafkaListener(
topics = "orders",
groupId = "order-processing-group",
containerFactory = "orderKafkaListenerContainerFactory"
)
public void processOrder(Order order) {
log.info("Получен новый заказ: {}", order);
try {
order.setStatus(OrderStatus.PROCESSING);
orderService.processOrder(order);
log.info("Заказ {} успешно обработан", order.getId());
} catch (Exception e) {
log.error("Ошибка при обработке заказа {}: {}", order.getId(), e.getMessage());
order.setStatus(OrderStatus.CANCELLED);
// Здесь может быть логика отправки уведомления об ошибке
}
}
}
Обработка ошибок и повторные попытки
Настройка политики повторных попыток
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> orderKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(orderConsumerFactory());
// Настройка повторных попыток
factory.setCommonErrorHandler(new DefaultErrorHandler(
new FixedBackOff(5000L, 3) // 3 попытки с интервалом 5 секунд
));
return factory;
}
}
Обработка специфических ошибок
@KafkaListener(topics = "orders", groupId = "order-processing-group")
public void processOrder(Order order) {
try {
validateOrder(order);
processValidOrder(order);
} catch (InvalidOrderException e) {
log.error("Получен некорректный заказ: {}", e.getMessage());
// Некорректные заказы не обрабатываем повторно
sendToDeadLetterQueue(order);
} catch (TemporaryProcessingException e) {
log.warn("Временная ошибка обработки: {}", e.getMessage());
// Временные ошибки будут обработаны повторно
throw e;
}
}
Мониторинг и логирование
Добавление метрик
@Service
@Slf4j
public class OrderConsumer {
private final Counter processedOrders =
Counter.builder("kafka.orders.processed")
.description("Количество обработанных заказов")
.register(Metrics.globalRegistry);
private final Timer processingTime =
Timer.builder("kafka.orders.processing_time")
.description("Время обработки заказа")
.register(Metrics.globalRegistry);
@KafkaListener(topics = "orders", groupId = "order-processing-group")
public void processOrder(Order order) {
processingTime.record(() -> {
try {
processOrderInternal(order);
processedOrders.increment();
} catch (Exception e) {
log.error("Ошибка обработки", e);
}
});
}
}
Практические сценарии использования
Сценарий 1: асинхронная обработка заказов
@Service
public class OrderProcessor {
@KafkaListener(topics = "new-orders")
public void processNewOrder(Order order) {
// 1. Валидация заказа
validateOrder(order);
// 2. Проверка наличия товара
checkInventory(order);
// 3. Резервирование товара
reserveItems(order);
// 4. Отправка подтверждения
sendConfirmation(order);
}
}
Сценарий 2: распределённая обработка событий
@Service
public class DistributedEventProcessor {
@KafkaListener(topics = "events", concurrency = "3")
public void processEvent(Event event) {
switch (event.getType()) {
case USER_REGISTRATION:
handleUserRegistration(event);
break;
case PAYMENT_RECEIVED:
handlePayment(event);
break;
case ORDER_STATUS_CHANGED:
handleOrderStatusChange(event);
break;
}
}
}
Заключение
Теперь у нас есть полноценный консьюмер, готовый к работе в production-окружении. Он умеет:
- Обрабатывать сложные бизнес-объекты
- Корректно обрабатывать ошибки
- Предоставлять метрики для мониторинга
- Масштабироваться под нагрузкой
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ