«1. Обзор
Apache Kafka — это мощная распределенная отказоустойчивая платформа с открытым исходным кодом для потоковой передачи событий. Однако, когда мы используем Kafka для отправки сообщений, размер которых превышает установленный лимит, возникает ошибка.
В предыдущем уроке мы показали, как работать со Spring и Kafka. В этом уроке мы рассмотрим способ отправки больших сообщений с помощью Kafka.
2. Постановка проблемы
Конфигурация Kafka ограничивает размер сообщений, которые разрешено отправлять. По умолчанию это ограничение составляет 1 МБ. Однако, если требуется отправлять большие сообщения, нам нужно настроить эти конфигурации в соответствии с нашими требованиями.
В этом уроке мы используем Kafka v2.5. Давайте сначала посмотрим на нашу настройку Kafka, прежде чем переходить к настройке.
3. Настройка
Здесь мы собираемся использовать базовую настройку Kafka с одним брокером. Кроме того, приложение-производитель может отправлять сообщения по определенной теме в Kafka Broker с помощью клиента Kafka. Кроме того, мы используем одну тему раздела:
Мы можем наблюдать здесь несколько точек взаимодействия, таких как Kafka Producer, Kafka Broker, Topic и Kafka Consumer. Следовательно, все они нуждаются в обновлениях конфигурации, чтобы иметь возможность отправлять большие сообщения с одного конца на другой.
Давайте подробно рассмотрим эти конфиги, чтобы отправить большое сообщение размером 20 МБ.
3. Настройка Kafka Producer
Это первое место, где возникает наше сообщение. И мы используем Spring Kafka для отправки сообщений из нашего приложения на сервер Kafka.
Следовательно, сначала необходимо обновить свойство «max.request.size». Дополнительные сведения об этой конфигурации производителя доступны в документации Kafka. Это доступно как константа ProducerConfig.MAX_REQUEST_SIZE_CONFIG в клиентской библиотеке Kafka, которая доступна как часть зависимости Spring Kafka.
Давайте настроим это значение на 20971520 байт:
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
return new DefaultKafkaProducerFactory<>(configProps);
}
4. Конфигурация темы Kafka
Наше приложение для создания сообщений отправляет сообщения в Kafka Broker по определенной теме. Следовательно, следующим требованием является настройка используемой темы Kafka. Это означает, что нам нужно обновить свойство «max.message.bytes», установив значение по умолчанию 1 МБ.
Здесь содержится значение наибольшего размера пакета записи Kafka после сжатия (если сжатие включено). Дополнительные сведения доступны в документации Kafka.
Давайте настроим это свойство вручную во время создания топика с помощью команды CLI:
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic longMessage --partitions 1 \
--replication-factor 1 --config max.message.bytes=20971520
В качестве альтернативы, мы можем настроить это свойство через Kafka Client:
public NewTopic topic() {
NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);
Map<String, String> configs = new HashMap<>();
configs.put("max.message.bytes", "20971520");
newTopic.configs(configs);
return newTopic;
}
Как минимум, нам нужно настроить эти два свойства.
5. Конфигурация брокера Kafka
Дополнительное свойство конфигурации «message.max.bytes» может использоваться для того, чтобы разрешить всем темам на брокере принимать сообщения размером более 1 МБ.
И это содержит значение наибольшего размера пакета записи, разрешенного Kafka после сжатия (если сжатие включено). Дополнительные сведения доступны в документации Kafka.
Добавим это свойство в конфигурационный файл «server.properties» Kafka Broker:
message.max.bytes=20971520
Более того, максимальное значение между «message.max.bytes» и «max.message.bytes» будет равно используемое эффективное значение.
6. Конфигурация потребителя
Давайте рассмотрим параметры конфигурации, доступные для потребителя Kafka. Хотя эти изменения не являются обязательными для обработки больших сообщений, отказ от них может повлиять на производительность приложения-потребителя. Следовательно, хорошо иметь и эти конфиги:
-
«max.partition.fetch.bytes: это свойство ограничивает количество байтов, которое потребитель может получить из раздела темы. Дополнительные сведения доступны в документации Kafka. Это доступно как константа с именем ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG в клиентской библиотеке Kafka fetch.max.bytes: это свойство ограничивает количество байтов, которые потребитель может получить с самого сервера Kafka. Потребитель Kafka также может прослушивать несколько разделов. Дополнительные сведения доступны в документации Kafka. Это доступно как константа ConsumerConfig.FETCH_MAX_BYTES_CONFIG в клиентской библиотеке Kafka
Поэтому для настройки наших потребителей нам нужно создать KafkaConsumerFactory. Помните, что нам всегда нужно использовать более высокое значение по сравнению с конфигурацией темы/брокера:
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
return new DefaultKafkaConsumerFactory<>(props);
}
Здесь мы использовали одно и то же значение конфигурации 20971520 байт для обоих свойств, потому что мы используем тему с одним разделом. Однако значение FETCH_MAX_BYTES_CONFIG должно быть больше, чем MAX_PARTITION_FETCH_BYTES_CONFIG. Когда у нас есть потребитель, прослушивающий несколько разделов, FETCH_MAX_BYTES_CONFIG представляет размер сообщения, которое может быть получено из нескольких разделов. С другой стороны, конфигурация MAX_PARTITION_FETCH_BYTES_CONFIG представляет размер выборки сообщений из одного раздела.
7. Альтернативы
Мы увидели, как различные конфигурации в Kafka Producer, Topic, Broker и Kafka Consumer могут быть обновлены для отправки больших сообщений. Однако в целом нам следует избегать отправки больших сообщений с помощью Kafka. Обработка больших сообщений потребляет больше процессора и памяти нашего производителя и потребителя. Следовательно, в конечном итоге несколько ограничивает их возможности обработки для других задач. Кроме того, это может вызвать заметно большую задержку для конечного пользователя.
Давайте рассмотрим другие возможные варианты:
- Kafka producer provides a feature to compress messages. Additionally, it supports different compression types that we can configure using the compression.type property.
- We can store the large messages in a file at the shared storage location and send the location through Kafka message. This can be a faster option and has minimum processing overhead.
- Another option could be to split the large message into small messages of size 1KB each at the producer end. After that, we can send all these messages to a single partition using the partition key to ensure the correct order. Therefore, later, at the consumer end, we can reconstruct the large message from smaller messages.
Если ни один из вышеперечисленных вариантов не соответствует нашим требованиям, мы можем использовать ранее обсуждавшиеся конфигурации.
8. Заключение
В этой статье мы рассмотрели различные конфигурации Kafka, необходимые для отправки больших сообщений размером более 1 МБ.
Мы рассмотрели потребности в конфигурациях на стороне производителя, темы, брокера и потребителя. Однако некоторые из них являются обязательными конфигурациями, а некоторые — необязательными. Кроме того, потребительские конфигурации являются необязательными, но они полезны, чтобы избежать негативного влияния на производительность.
В конце мы также рассмотрели альтернативные возможные варианты отправки больших сообщений.
Как всегда, пример кода доступен на GitHub.