JavaRush /Курси /Модуль 5. Spring /Лекція 270: Тестування асинхронних процесів (Kafka, Rabbi...

Лекція 270: Тестування асинхронних процесів (Kafka, RabbitMQ)

Модуль 5. Spring
Рівень 21 , Лекція 9
Відкрита

У попередній лекції ми познайомилися з Testcontainers — інструментом, який дозволяє піднімати реальні бази даних і інші сервіси (включно з Kafka і RabbitMQ) в тестовому оточенні. Це буде ключовим моментом в сьогоднішній практиці.


Чому тестування асинхронних процесів — виклик?

Асинхронні процеси додають свою «родзинку» до тестування. На відміну від "звичайних синхронних запитів", де результат повертається одразу (або не повертається, якщо щось зламалося), тут ми маємо справу з:

  • Чергами повідомлень: передача інформації через брокери даних (Kafka, RabbitMQ).
  • Затримками: іноді повідомлення обробляються не миттєво.
  • Перевіркою кількох компонентів: продюсерів, консьюмерів і самого брокера (в якого теж бувають свої капризи).

Основні задачі тестування асинхронних процесів:

  1. Переконатися, що повідомлення успішно відправлене продюсером.
  2. Перевірити, що консьюмер правильно обробив повідомлення.
  3. Переконатися, що повідомлення не губляться і відповідають потрібним гарантіям доставки.

Інструменти для тестування Kafka і RabbitMQ

Для тестування асинхронних процесів використаємо:

  • JUnit 5: Для написання тестів.
  • Testcontainers: Для підняття ізольованих інстансів Kafka і RabbitMQ.
  • Spring Kafka Test: Бібліотека для тестування Kafka в Spring Boot.
  • Spring AMQP Test: Бібліотека для тестування RabbitMQ.
  • Awaitility: Для зручного очікування результатів асинхронних операцій.

Як тестувати Kafka?

1. Підготовка оточення за допомогою Testcontainers

Testcontainers чудово підходить для підняття Kafka в тестовому оточенні. Почнемо з конфігурації.


import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import static org.junit.jupiter.api.Assertions.assertNotNull;

public class KafkaTest {

    @Test
    void testKafkaContainer() {
        // Запускаємо Kafka за допомогою Testcontainers
        KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
        kafkaContainer.start();

        // Перевірка, що Kafka запущена
        assertNotNull(kafkaContainer.getBootstrapServers(), "Kafka не запущена");

        kafkaContainer.stop();
    }
}

Що ми зробили?

  • Запустили Kafka всередині Docker-контейнера.
  • Перевірили, що Kafka запустилась, і ми можемо отримати адресу її сервера.

2. Тестування продюсера

Тепер створимо тест, щоб перевірити, чи коректно повідомлення відправляються в Kafka.


import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.beans.factory.annotation.Autowired;
import static org.assertj.core.api.Assertions.assertThat;

@EmbeddedKafka(partitions = 1, topics = {"test-topic"})
public class KafkaProducerTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    void testSendMessage() throws Exception {
        // Відправляємо повідомлення в Kafka
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "message");
        RecordMetadata metadata = kafkaTemplate.send(record).get().getRecordMetadata();

        // Перевіряємо, що повідомлення успішно відправлено
        assertThat(metadata.topic()).isEqualTo("test-topic");
    }
}

Що ми зробили?

  • Використали вбудований Kafka брокер (@EmbeddedKafka) для тестування, щоб не піднімати тестову інфраструктуру вручну.
  • Відправили повідомлення за допомогою KafkaTemplate і перевірили, що воно успішно записано в топік.

3. Тестування консьюмера

Консьюмери — це наші «працьовиті робітники», які обробляють повідомлення. Ми перевіримо, що вони працюють коректно.


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

@EmbeddedKafka(partitions = 1, topics = {"test-topic"})
public class KafkaConsumerTest {

    @Test
    void testConsumeMessage() {
        // Налаштовуємо тестовий консьюмер
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", kafkaBroker);
        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<String, String> consumer = consumerFactory.createConsumer();

        // Підписуємося на топік і перевіряємо отримане повідомлення
        consumer.subscribe(Collections.singletonList("test-topic"));
        ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");

        assertThat(record.value()).isEqualTo("message");
    }
}

Як тестувати RabbitMQ?

1. Підготовка оточення

Піднімемо RabbitMQ за допомогою Testcontainers.


import org.junit.jupiter.api.Test;
import org.testcontainers.containers.RabbitMQContainer;

import static org.junit.jupiter.api.Assertions.assertTrue;

public class RabbitMQTest {

    @Test
    void testRabbitMQContainer() {
        RabbitMQContainer rabbitMQContainer = new RabbitMQContainer("rabbitmq:3-management");
        rabbitMQContainer.start();

        assertTrue(rabbitMQContainer.isRunning());
        rabbitMQContainer.stop();
    }
}

2. Тестування продюсера

Перевіримо, що повідомлення відправляється в чергу RabbitMQ.


import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class RabbitProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testSendMessage() {
        // Відправляємо повідомлення
        String queue = "test-queue";
        String message = "Hello RabbitMQ";
        rabbitTemplate.convertAndSend(queue, message);

        // Перевіряємо, що повідомлення відправлено
        String receivedMessage = (String) rabbitTemplate.receiveAndConvert(queue);
        assertEquals(message, receivedMessage);
    }
}

3. Тестування консьюмера

Напишемо тест для перевірки консьюмера.


import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;

import static org.junit.jupiter.api.Assertions.assertTrue;

public class RabbitConsumerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MessageListenerContainer container;

    @Test
    void testConsumeMessage() {
        // Відправляємо повідомлення
        String queue = "test-queue";
        String message = "Hello RabbitMQ";
        rabbitTemplate.convertAndSend(queue, message);

        // Перевіряємо, що повідомлення оброблене
        container.start();
        String receivedMessage = (String) rabbitTemplate.receiveAndConvert(queue);
        assertTrue(receivedMessage.equals(message));
        container.stop();
    }
}

Що важливо пам'ятати?

  1. Асинхронність — це складно: враховуйте затримки в обробці повідомлень. Використовуйте бібліотеки, такі як Awaitility, для очікування.
  2. Не забувайте ізолювати тести: Testcontainers чи @EmbeddedKafka/@RabbitMQ допоможуть у цьому.
  3. Kafka і RabbitMQ — це черги повідомлень з різними особливостями, тому підходи до тестування можуть трохи відрізнятися.

Тепер ви озброєні всім необхідним для тестування асинхронних процесів.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ