У попередній лекції ми познайомилися з Testcontainers — інструментом, який дозволяє піднімати реальні бази даних і інші сервіси (включно з Kafka і RabbitMQ) в тестовому оточенні. Це буде ключовим моментом в сьогоднішній практиці.
Чому тестування асинхронних процесів — виклик?
Асинхронні процеси додають свою «родзинку» до тестування. На відміну від "звичайних синхронних запитів", де результат повертається одразу (або не повертається, якщо щось зламалося), тут ми маємо справу з:
- Чергами повідомлень: передача інформації через брокери даних (Kafka, RabbitMQ).
- Затримками: іноді повідомлення обробляються не миттєво.
- Перевіркою кількох компонентів: продюсерів, консьюмерів і самого брокера (в якого теж бувають свої капризи).
Основні задачі тестування асинхронних процесів:
- Переконатися, що повідомлення успішно відправлене продюсером.
- Перевірити, що консьюмер правильно обробив повідомлення.
- Переконатися, що повідомлення не губляться і відповідають потрібним гарантіям доставки.
Інструменти для тестування Kafka і RabbitMQ
Для тестування асинхронних процесів використаємо:
- JUnit 5: Для написання тестів.
- Testcontainers: Для підняття ізольованих інстансів Kafka і RabbitMQ.
- Spring Kafka Test: Бібліотека для тестування Kafka в Spring Boot.
- Spring AMQP Test: Бібліотека для тестування RabbitMQ.
- Awaitility: Для зручного очікування результатів асинхронних операцій.
Як тестувати Kafka?
1. Підготовка оточення за допомогою Testcontainers
Testcontainers чудово підходить для підняття Kafka в тестовому оточенні. Почнемо з конфігурації.
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class KafkaTest {
@Test
void testKafkaContainer() {
// Запускаємо Kafka за допомогою Testcontainers
KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
kafkaContainer.start();
// Перевірка, що Kafka запущена
assertNotNull(kafkaContainer.getBootstrapServers(), "Kafka не запущена");
kafkaContainer.stop();
}
}
Що ми зробили?
- Запустили Kafka всередині Docker-контейнера.
- Перевірили, що Kafka запустилась, і ми можемо отримати адресу її сервера.
2. Тестування продюсера
Тепер створимо тест, щоб перевірити, чи коректно повідомлення відправляються в Kafka.
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.beans.factory.annotation.Autowired;
import static org.assertj.core.api.Assertions.assertThat;
@EmbeddedKafka(partitions = 1, topics = {"test-topic"})
public class KafkaProducerTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
void testSendMessage() throws Exception {
// Відправляємо повідомлення в Kafka
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "message");
RecordMetadata metadata = kafkaTemplate.send(record).get().getRecordMetadata();
// Перевіряємо, що повідомлення успішно відправлено
assertThat(metadata.topic()).isEqualTo("test-topic");
}
}
Що ми зробили?
- Використали вбудований Kafka брокер (
@EmbeddedKafka) для тестування, щоб не піднімати тестову інфраструктуру вручну. - Відправили повідомлення за допомогою
KafkaTemplateі перевірили, що воно успішно записано в топік.
3. Тестування консьюмера
Консьюмери — це наші «працьовиті робітники», які обробляють повідомлення. Ми перевіримо, що вони працюють коректно.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@EmbeddedKafka(partitions = 1, topics = {"test-topic"})
public class KafkaConsumerTest {
@Test
void testConsumeMessage() {
// Налаштовуємо тестовий консьюмер
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", kafkaBroker);
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, String> consumer = consumerFactory.createConsumer();
// Підписуємося на топік і перевіряємо отримане повідомлення
consumer.subscribe(Collections.singletonList("test-topic"));
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
assertThat(record.value()).isEqualTo("message");
}
}
Як тестувати RabbitMQ?
1. Підготовка оточення
Піднімемо RabbitMQ за допомогою Testcontainers.
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.RabbitMQContainer;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RabbitMQTest {
@Test
void testRabbitMQContainer() {
RabbitMQContainer rabbitMQContainer = new RabbitMQContainer("rabbitmq:3-management");
rabbitMQContainer.start();
assertTrue(rabbitMQContainer.isRunning());
rabbitMQContainer.stop();
}
}
2. Тестування продюсера
Перевіримо, що повідомлення відправляється в чергу RabbitMQ.
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RabbitProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessage() {
// Відправляємо повідомлення
String queue = "test-queue";
String message = "Hello RabbitMQ";
rabbitTemplate.convertAndSend(queue, message);
// Перевіряємо, що повідомлення відправлено
String receivedMessage = (String) rabbitTemplate.receiveAndConvert(queue);
assertEquals(message, receivedMessage);
}
}
3. Тестування консьюмера
Напишемо тест для перевірки консьюмера.
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RabbitConsumerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageListenerContainer container;
@Test
void testConsumeMessage() {
// Відправляємо повідомлення
String queue = "test-queue";
String message = "Hello RabbitMQ";
rabbitTemplate.convertAndSend(queue, message);
// Перевіряємо, що повідомлення оброблене
container.start();
String receivedMessage = (String) rabbitTemplate.receiveAndConvert(queue);
assertTrue(receivedMessage.equals(message));
container.stop();
}
}
Що важливо пам'ятати?
- Асинхронність — це складно: враховуйте затримки в обробці повідомлень. Використовуйте бібліотеки, такі як Awaitility, для очікування.
- Не забувайте ізолювати тести: Testcontainers чи @EmbeddedKafka/@RabbitMQ допоможуть у цьому.
- Kafka і RabbitMQ — це черги повідомлень з різними особливостями, тому підходи до тестування можуть трохи відрізнятися.
Тепер ви озброєні всім необхідним для тестування асинхронних процесів.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ