В прошлой лекции мы познакомились с Testcontainers: инструментом, который позволяет поднимать реальные базы данных и другие сервисы (включая Kafka и RabbitMQ) в тестовом окружении. Это будет ключевым моментом в сегодняшней практике.
Почему тестирование асинхронных процессов — это вызов?
Асинхронные процессы добавляют свою изюминку к тестированию. В отличие от "обычных синхронных запросов", где результат возвращается немедленно (или не возвращается, если всё сломалось), здесь мы имеем дело с:
- Очередями сообщений: Передача информации через брокеры данных (Kafka, RabbitMQ).
- Задержками: Иногда сообщения обрабатываются не мгновенно.
- Проверкой нескольких компонентов: Продюсеров, консьюмеров и самого брокера (у которого тоже бывают свои причуды).
Основные задачи тестирования асинхронных процессов:
- Убедиться, что сообщение успешно отправлено продюсером.
- Проверить, что консьюмер правильно обработал сообщение.
- Убедиться, что сообщения не теряются и соответствуют необходимым гарантиям доставки.
Инструменты для тестирования Kafka и RabbitMQ
Для тестирования асинхронных процессов мы будем использовать:
- JUnit 5: Для написания тестов.
- Testcontainers: Для поднятия изолированных инстансов Kafka и RabbitMQ.
- Spring Kafka Test: Библиотека для тестирования Kafka в Spring Boot.
- Spring AMQP Test: Библиотека для тестирования RabbitMQ.
- Awaitility: Для удобного ожидания результатов асинхронных операций.
Как тестировать Kafka?
1. Подготовка окружения с помощью Testcontainers
Testcontainers отлично подходит для запуска Kafka в тестовом окружении. Давайте начнём с конфигурации.
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class KafkaTest {
@Test
void testKafkaContainer() {
// Запускаем Kafka с помощью Testcontainers
KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
kafkaContainer.start();
// Проверка, что Kafka запущена
assertNotNull(kafkaContainer.getBootstrapServers(), "Kafka не запущена");
kafkaContainer.stop();
}
}
Что мы сделали?
- Запустили Kafka внутри Docker-контейнера.
- Проверили, что Kafka запустилась, и мы можем получить адрес её сервера.
2. Тестирование продюсера
Теперь создадим тест для проверки, правильно ли сообщения отправляются в Kafka.
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.beans.factory.annotation.Autowired;
import static org.assertj.core.api.Assertions.assertThat;
@EmbeddedKafka(partitions = 1, topics = {"test-topic"})
public class KafkaProducerTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
void testSendMessage() throws Exception {
// Отправляем сообщение в Kafka
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "message");
RecordMetadata metadata = kafkaTemplate.send(record).get().getRecordMetadata();
// Проверяем, что сообщение успешно отправлено
assertThat(metadata.topic()).isEqualTo("test-topic");
}
}
Что мы сделали?
- Использовали встроенный Kafka брокер (
@EmbeddedKafka) для тестирования, чтобы не поднимать тестовую инфраструктуру вручную. - Отправили сообщение с помощью
KafkaTemplateи проверили, что оно успешно записано в топик.
3. Тестирование консьюмера
Консьюмеры — это наши "усердные работники", которые обрабатывают сообщения. Мы проверим, что они работают корректно.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@EmbeddedKafka(partitions = 1, topics = {"test-topic"})
public class KafkaConsumerTest {
@Test
void testConsumeMessage() {
// Настраиваем тестовый консьюмер
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", kafkaBroker);
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, String> consumer = consumerFactory.createConsumer();
// Подписываемся на топик и проверяем полученное сообщение
consumer.subscribe(Collections.singletonList("test-topic"));
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
assertThat(record.value()).isEqualTo("message");
}
}
Как тестировать RabbitMQ?
1. Подготовка окружения
Поднимем RabbitMQ при помощи Testcontainers.
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.RabbitMQContainer;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RabbitMQTest {
@Test
void testRabbitMQContainer() {
RabbitMQContainer rabbitMQContainer = new RabbitMQContainer("rabbitmq:3-management");
rabbitMQContainer.start();
assertTrue(rabbitMQContainer.isRunning());
rabbitMQContainer.stop();
}
}
2. Тестирование продюсера
Проверим, что сообщение отправляется в очередь RabbitMQ.
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RabbitProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessage() {
// Отправляем сообщение
String queue = "test-queue";
String message = "Hello RabbitMQ";
rabbitTemplate.convertAndSend(queue, message);
// Проверяем, что сообщение отправлено
String receivedMessage = (String) rabbitTemplate.receiveAndConvert(queue);
assertEquals(message, receivedMessage);
}
}
3. Тестирование консьюмера
Напишем тест для проверки консьюмера.
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RabbitConsumerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageListenerContainer container;
@Test
void testConsumeMessage() {
// Отправляем сообщение
String queue = "test-queue";
String message = "Hello RabbitMQ";
rabbitTemplate.convertAndSend(queue, message);
// Проверяем, что сообщение обработано
container.start();
String receivedMessage = (String) rabbitTemplate.receiveAndConvert(queue);
assertTrue(receivedMessage.equals(message));
container.stop();
}
}
Что важно помнить?
- Асинхронность — это сложно: учитывайте задержки в обработке сообщений. Используйте библиотеки, такие как Awaitility, для ожидания.
- Не забывайте изолировать тесты: Testcontainers или @EmbeddedKafka/@RabbitMQ помогут с этим.
- Kafka и RabbitMQ — это очереди сообщений с разными особенностями, поэтому подходы к тестированию могут слегка отличаться.
Теперь вы вооружены всем необходимым для тестирования асинхронных процессов.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ