JavaRush /Курсы /Модуль 5. Spring /Лекция 199: Обработка сообщений в асинхронном режиме

Лекция 199: Обработка сообщений в асинхронном режиме

Модуль 5. Spring
20 уровень , 8 лекция
Открыта

Мы проделали значительный путь в изучении интеграции Spring Boot с Kafka. В прошлых лекциях вы узнали, что такое Spring Kafka, как настроить конфигурацию для работы с Kafka, используя KafkaTemplate для отправки сообщений, и даже реализовали собственного продюсера и консьюмера для взаимодействия с топиками Kafka. Вы научились использовать аннотации @KafkaListener для обработки входящих сообщений и стали уверенно запускать свои приложения для тестирования. Однако всё это делалось в синхронном режиме. Сегодня мы добавим немного магии асинхронности, чтобы наши приложения стали ещё более эффективными.


Асинхронная обработка сообщений на практике

Представим, что вы разрабатываете систему для крупного интернет-магазина электроники. Каждый день поступают тысячи заказов, и каждый заказ требует:

  • Проверки наличия товара на складе
  • Резервирования товара
  • Проверки платежной информации
  • Отправки уведомления клиенту
  • Передачи информации в службу доставки

Если обрабатывать эти операции последовательно, каждый заказ будет занимать много времени. Это всё равно что один сотрудник магазина пытается сделать всё сам: сначала идёт на склад проверять наличие, потом в бухгалтерию проверять оплату, затем звонит курьеру, и только после этого берётся за следующий заказ.

Гораздо эффективнее работает асинхронный подход. Это как команда сотрудников, где каждый занимается своей частью работы одновременно: один проверяет склад, второй обрабатывает оплату, третий готовит документы для доставки. Kafka отлично подходит для такой схемы работы, позволяя распределить нагрузку и ускорить обработку заказов.


Как настроить асинхронную обработку сообщений?

Асинхронная обработка сообщений в Spring Kafka — это симбиоз средств, которые предоставляет Spring, и возможностей асинхронного программирования в Java. Здесь нам на помощь придут следующие механизмы:

  1. Многопоточная обработка через KafkaListenerContainer.
  2. Пользовательские ExecutorService для управления потоками.
  3. Ретраи и обработка ошибок, чтобы избежать провалов при асинхронной обработке.

Настройка асинхронности: шаг за шагом

1. Использование аннотации @KafkaListener с параметром concurrency

Spring Kafka позволяет запускать несколько потоков для обработки сообщений из одного топика с помощью параметра concurrency. Это помогает распределить нагрузку и процессить сообщения быстрее.


import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class AsyncKafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group", concurrency = "3")
    public void processMessage(String message) {
        System.out.println("Received message: " + message);
        // Имитируем долгую обработку
        try {
            Thread.sleep(3000); // 3 секунды на обработку сообщения
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("Processed message: " + message);
    }
}

Как это работает?

  • concurrency = "3": Указывает Spring Kafka использовать три потока для обработки сообщений из топика.
  • Каждый поток обрабатывает своё сообщение, что увеличивает производительность.

Теперь, если отправить 9 сообщений в топик, они будут обработаны в 3 потока параллельно.

2. Настройка ConcurrentKafkaListenerContainerFactory

Иногда стандартной настройки недостаточно, и вам нужно указать свой пул потоков для обработки сообщений. Вот пример настройки кастомного ConcurrentKafkaListenerContainerFactory:


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

@Configuration
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);

        // Кастомный ExecutorService для управления потоками
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        factory.setConcurrency(5); // 5 потоков
        factory.getContainerProperties().setConsumerExecutor(executorService);

        return factory;
    }
}

Теперь ваш консьюмер будет использовать 5 потоков, управляемых вашим кастомным ExecutorService.

3. Асинхронность с использованием CompletableFuture

Иногда обработка сообщений требует вызова функции, которая возвращает результат только через какое-то время (например, запросов к удалённым сервисам). В этом случае на помощь приходит CompletableFuture.


import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class AsyncProcessingConsumer {

    @KafkaListener(topics = "async-topic", groupId = "async-group")
    public void consumeMessage(String message) {
        CompletableFuture.runAsync(() -> {
            System.out.println("Processing message asynchronously: " + message);
            try {
                Thread.sleep(2000); // Имитация сложной обработки
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Finished processing message: " + message);
        });
    }
}

Здесь каждое сообщение обрабатывается в отдельном потоке, что позволяет не блокировать основной поток @KafkaListener.

4. Обработка ошибок

Асинхронная обработка часто сопряжена с нестабильностью: удалённый сервис может быть недоступен, сообщение может быть повреждено и т.д. Чтобы справляться с такими ситуациями, мы можем использовать механизмы ретраев и Backoff в Kafka.


import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ContainerStoppingErrorHandler;
import org.springframework.stereotype.Component;

@Component
public class ErrorHandlingConsumer {

    @KafkaListener(topics = "retry-topic", groupId = "retry-group",
            errorHandler = "customErrorHandler")
    public void consumeMessage(String message) {
        System.out.println("Processing message: " + message);
        if (message.contains("error")) {
            throw new RuntimeException("Error occurred while processing message: " + message);
        }
    }
}

Конфигурируем кастомный ErrorHandler:


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.ContainerStoppingErrorHandler;

@Configuration
public class KafkaErrorHandlerConfig {

    @Bean
    public ContainerStoppingErrorHandler customErrorHandler() {
        return new ContainerStoppingErrorHandler();
    }
}

В реальной жизни вместо остановки контейнера лучше настроить рестарт через ретраи или отложенную повторную обработку задачи.


Практика: настройка и тестирование

Для закрепления материала предлагаем создать небольшой проект:

  1. Отправка сообщений через Kafka-продюсера: Настройте продюсера для отправки текстовых сообщений в топик async-topic.
  2. Обработка сообщений с помощью асинхронного консьюмера: Реализуйте обработку сообщений через CompletableFuture для обработки сложных задач.
  3. Тестирование производительности: Отправьте 1000 сообщений и посмотрите, как асинхронность помогает обрабатывать их быстрее. Используйте @KafkaListener с concurrency и кастомным пулом потоков.

Типичные ошибки и как их избежать

  1. Потоки блокируются при обработке длительных задач. Если используете асинхронность, избегайте вызовов блокирующих методов. Переписывайте их на асинхронные аналоги или используйте CompletableFuture.
  2. Ошибка настройки concurrency приводит к перегрузке системы. Если увеличить параллелизм до 100 потоков, а ваша машина не справляется с таким количеством, приложение начнёт падать. Оценивайте ресурсы перед настройкой.
  3. Отсутствие обработки ошибок в потоках. В асинхронной обработке ошибки могут теряться. Всегда логируйте их и добавляйте ретраи при необходимости.

Итоговая структура проекта

Ваш проект должен выглядеть так:


src/main/java
└── com.yourcompany.kafka
    ├── config
    │   ├── KafkaConfig.java     // Конфигурация Kafka и пула потоков
    │   └── KafkaErrorHandlerConfig.java // Обработчик ошибок
    ├── producer
    │   └── MessageProducer.java // Kafka продюсер
    └── consumer
        ├── AsyncKafkaConsumer.java // Асинхронный Kafka консьюмер
        └── ErrorHandlingConsumer.java // Консьюмер с обработкой ошибок
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ