JavaRush /Курсы /Модуль 5. Spring /Лекция 270: Тестирование асинхронных процессов (Kafka, Ra...

Лекция 270: Тестирование асинхронных процессов (Kafka, RabbitMQ)

Модуль 5. Spring
27 уровень , 9 лекция
Открыта

В прошлой лекции мы познакомились с Testcontainers: инструментом, который позволяет поднимать реальные базы данных и другие сервисы (включая Kafka и RabbitMQ) в тестовом окружении. Это будет ключевым моментом в сегодняшней практике.


Почему тестирование асинхронных процессов — это вызов?

Асинхронные процессы добавляют свою изюминку к тестированию. В отличие от "обычных синхронных запросов", где результат возвращается немедленно (или не возвращается, если всё сломалось), здесь мы имеем дело с:

  • Очередями сообщений: Передача информации через брокеры данных (Kafka, RabbitMQ).
  • Задержками: Иногда сообщения обрабатываются не мгновенно.
  • Проверкой нескольких компонентов: Продюсеров, консьюмеров и самого брокера (у которого тоже бывают свои причуды).

Основные задачи тестирования асинхронных процессов:

  1. Убедиться, что сообщение успешно отправлено продюсером.
  2. Проверить, что консьюмер правильно обработал сообщение.
  3. Убедиться, что сообщения не теряются и соответствуют необходимым гарантиям доставки.

Инструменты для тестирования Kafka и RabbitMQ

Для тестирования асинхронных процессов мы будем использовать:

  • JUnit 5: Для написания тестов.
  • Testcontainers: Для поднятия изолированных инстансов Kafka и RabbitMQ.
  • Spring Kafka Test: Библиотека для тестирования Kafka в Spring Boot.
  • Spring AMQP Test: Библиотека для тестирования RabbitMQ.
  • Awaitility: Для удобного ожидания результатов асинхронных операций.

Как тестировать Kafka?

1. Подготовка окружения с помощью Testcontainers

Testcontainers отлично подходит для запуска Kafka в тестовом окружении. Давайте начнём с конфигурации.


import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import static org.junit.jupiter.api.Assertions.assertNotNull;

public class KafkaTest {

    @Test
    void testKafkaContainer() {
        // Запускаем Kafka с помощью Testcontainers
        KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
        kafkaContainer.start();

        // Проверка, что Kafka запущена
        assertNotNull(kafkaContainer.getBootstrapServers(), "Kafka не запущена");

        kafkaContainer.stop();
    }
}

Что мы сделали?

  • Запустили Kafka внутри Docker-контейнера.
  • Проверили, что Kafka запустилась, и мы можем получить адрес её сервера.

2. Тестирование продюсера

Теперь создадим тест для проверки, правильно ли сообщения отправляются в Kafka.


import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.beans.factory.annotation.Autowired;
import static org.assertj.core.api.Assertions.assertThat;

@EmbeddedKafka(partitions = 1, topics = {"test-topic"})
public class KafkaProducerTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    void testSendMessage() throws Exception {
        // Отправляем сообщение в Kafka
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "message");
        RecordMetadata metadata = kafkaTemplate.send(record).get().getRecordMetadata();

        // Проверяем, что сообщение успешно отправлено
        assertThat(metadata.topic()).isEqualTo("test-topic");
    }
}

Что мы сделали?

  • Использовали встроенный Kafka брокер (@EmbeddedKafka) для тестирования, чтобы не поднимать тестовую инфраструктуру вручную.
  • Отправили сообщение с помощью KafkaTemplate и проверили, что оно успешно записано в топик.

3. Тестирование консьюмера

Консьюмеры — это наши "усердные работники", которые обрабатывают сообщения. Мы проверим, что они работают корректно.


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

@EmbeddedKafka(partitions = 1, topics = {"test-topic"})
public class KafkaConsumerTest {

    @Test
    void testConsumeMessage() {
        // Настраиваем тестовый консьюмер
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", kafkaBroker);
        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<String, String> consumer = consumerFactory.createConsumer();

        // Подписываемся на топик и проверяем полученное сообщение
        consumer.subscribe(Collections.singletonList("test-topic"));
        ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");

        assertThat(record.value()).isEqualTo("message");
    }
}

Как тестировать RabbitMQ?

1. Подготовка окружения

Поднимем RabbitMQ при помощи Testcontainers.


import org.junit.jupiter.api.Test;
import org.testcontainers.containers.RabbitMQContainer;

import static org.junit.jupiter.api.Assertions.assertTrue;

public class RabbitMQTest {

    @Test
    void testRabbitMQContainer() {
        RabbitMQContainer rabbitMQContainer = new RabbitMQContainer("rabbitmq:3-management");
        rabbitMQContainer.start();

        assertTrue(rabbitMQContainer.isRunning());
        rabbitMQContainer.stop();
    }
}

2. Тестирование продюсера

Проверим, что сообщение отправляется в очередь RabbitMQ.


import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class RabbitProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testSendMessage() {
        // Отправляем сообщение
        String queue = "test-queue";
        String message = "Hello RabbitMQ";
        rabbitTemplate.convertAndSend(queue, message);

        // Проверяем, что сообщение отправлено
        String receivedMessage = (String) rabbitTemplate.receiveAndConvert(queue);
        assertEquals(message, receivedMessage);
    }
}

3. Тестирование консьюмера

Напишем тест для проверки консьюмера.


import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;

import static org.junit.jupiter.api.Assertions.assertTrue;

public class RabbitConsumerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MessageListenerContainer container;

    @Test
    void testConsumeMessage() {
        // Отправляем сообщение
        String queue = "test-queue";
        String message = "Hello RabbitMQ";
        rabbitTemplate.convertAndSend(queue, message);

        // Проверяем, что сообщение обработано
        container.start();
        String receivedMessage = (String) rabbitTemplate.receiveAndConvert(queue);
        assertTrue(receivedMessage.equals(message));
        container.stop();
    }
}

Что важно помнить?

  1. Асинхронность — это сложно: учитывайте задержки в обработке сообщений. Используйте библиотеки, такие как Awaitility, для ожидания.
  2. Не забывайте изолировать тесты: Testcontainers или @EmbeddedKafka/@RabbitMQ помогут с этим.
  3. Kafka и RabbitMQ — это очереди сообщений с разными особенностями, поэтому подходы к тестированию могут слегка отличаться.

Теперь вы вооружены всем необходимым для тестирования асинхронных процессов.

Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ