Сегодня мы реализуем более полный пример продюсер-консьюмер. Цели этой лекции:
- Закрепить знания об основах взаимодействия с Kafka через реализацию продюсера и консьюмера на Java.
- Настроить простое приложение, которое отправляет и получает сообщения через Kafka.
- Понять, как управлять параметрами продюсера и консьюмера в целях оптимизации.
Настройка Maven-зависимостей
Для начала нам потребуется добавить в проект зависимости Kafka-клиента. Откройте ваш pom.xml и добавьте следующий фрагмент:
<dependencies>
<!-- Kafka-клиент -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
</dependencies>
Зависимость kafka-clients предоставляет API для работы с Kafka. Убедитесь, что версия Kafka, указанная здесь, совпадает с локально установленной версией Kafka.
Конфигурация Kafka
Перед тем как реализовывать код, давайте определим:
- Топик: создаем топик с именем
my-first-topic. - Брокер: указываем адрес Kafka-брокера — в данном случае
localhost:9092.
Если вы еще не создали топик, выполните следующую команду в терминале:
bin/kafka-topics.sh --create --topic my-first-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
- --partitions: количество партиций (3 в данном случае).
- --replication-factor: фактор репликации (1 для локального использования).
Теперь всё готово к написанию продюсера и консьюмера. Вот где начинает оживать магия.
Реализация продюсера
Продюсер отправляет сообщения в топик. Мы создадим простую Java-программу, которая отправляет текстовые сообщения в Kafka.
Создадим продюсер
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
// Настройки продюсера
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Адрес Kafka-брокера
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Сериализация ключа
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Сериализация значения
// Создание продюсера
Producer<String, String> producer = new KafkaProducer<>(props);
try {
// Отправляем 10 сообщений
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String value = "message-" + i;
// Создаем запись (сообщение) для топика
ProducerRecord<String, String> record = new ProducerRecord<>("my-first-topic", key, value);
// Отправляем запись и обрабатываем ответ асинхронно
producer.send(record, (RecordMetadata metadata, Exception exception) -> {
if (exception == null) {
System.out.printf("Отправлено сообщение: ключ=%s, значение=%s, партиция=%d, смещение=%d%n",
key, value, metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
}
} finally {
// Закрываем продюсер
producer.close();
}
}
}
В этом примере:
- Настройки продюсера:
bootstrap.serversуказывает Kafka-брокер,key.serializerиvalue.serializerопределяют, как ключи и значения сообщений сериализуются. - ProducerRecord: это сообщение, отправляемое в Kafka. Мы используем ключ, чтобы указать, в какую партицию попадет сообщение.
- Callback: метод обратного вызова позволяет узнать, успешно ли отправлено сообщение.
- Закрытие продюсера: обязательно закрывайте продюсер, чтобы освободить ресурсы.
Запустите этот код, и сообщения полетят в топик my-first-topic.
Реализация консьюмера
Теперь реализуем консьюмера, который считывает сообщения из топика.
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
// Настройки консьюмера
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-first-group"); // Группа консьюмеров
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest"); // Читаем сообщения с самого начала топика
// Создание консьюмера
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// Подписываемся на топик
consumer.subscribe(Collections.singletonList("my-first-topic"));
try {
while (true) {
// Получаем записи из Kafka
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("Получено сообщение: ключ=%s, значение=%s, партиция=%d, смещение=%d%n",
record.key(), record.value(), record.partition(), record.offset());
});
}
} finally {
// Закрываем консьюмера
consumer.close();
}
}
}
Вот что происходит в этом примере:
- Настройки консьюмера:
group.idопределяет группу, которой принадлежит консьюмер,auto.offset.resetуказывает читать сообщения с начала, если смещения нет. - Подписка на топик: метод
subscribe()позволяет подписаться на один или несколько топиков. - poll(): метод для получения новых сообщений из Kafka.
- Закрытие консьюмера: также важно закрывать консьюмер.
Запустите этот код, и он выведет сообщения, отправленные нашим продюсером.
Проверка работы
- Сначала запустите Kafka и убедитесь, что топик
my-first-topicсуществует. - Запустите
SimpleConsumer, который начнет слушать топик. - Запустите
SimpleProducer, и вы увидите, что сообщения отправляются продюсером и читаются консьюмером.
Полезные советы
- Сериализация и десериализация:
- Kafka позволяет использовать любые типы данных (например, JSON, Avro). Для этого необходимо воспользоваться соответствующими сериализаторами.
- Например, для JSON можно использовать
org.springframework.kafka.support.serializer.JsonSerializer.
- Управление offset:
- Offset — это "закладка", которая хранит, какие сообщения уже прочитаны.
- Если нужно читать сообщения повторно, установите
enable.auto.commit=falseи вручную фиксируйте offset.
Применение в реальной жизни
На практике продюсеры часто используются для записи событий, например, данных о транзакциях, логах или изменениях в базах данных. Консьюмеры, в свою очередь, могут обрабатывать эти события, отправлять уведомления, обновлять кэши или выполнять другие операции.
Kafka — это основа для высоконагруженных систем, таких как платежные шлюзы, системы рекомендаций, аналитика и многое другое. Поэтому понимание работы с Kafka — важный навык для Java-разработчика.
Теперь у вас есть свой продюсер и консьюмер. Вы только что сделали первый шаг в мир асинхронной магии Kafka!
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ