«1. Обзор
Когда производитель отправляет сообщение в Apache Kafka, он добавляет его в файл журнала и сохраняет в течение заданного периода времени.
В этом руководстве мы научимся настраивать свойства хранения сообщений на основе времени для тем Kafka.
2. Хранение на основе времени
При заданных свойствах периода хранения сообщения имеют TTL (время жизни). По истечении срока действия сообщения помечаются для удаления, тем самым освобождая место на диске.
Одно и то же свойство срока хранения применяется ко всем сообщениям в заданной теме Kafka. Кроме того, мы можем установить эти свойства либо перед созданием темы, либо изменить их во время выполнения для уже существующей темы.
В следующих разделах мы узнаем, как настроить это с помощью конфигурации брокера для установки периода хранения для новых тем и конфигурации на уровне темы для управления им во время выполнения.
3. Конфигурация на уровне сервера
Apache Kafka поддерживает политику хранения на уровне сервера, которую можно настроить, настроив ровно одно из трех свойств конфигурации, основанных на времени:
-
log.retention.hours log.retention. минуты log.retention.ms
Важно понимать, что Kafka переопределяет значение с меньшей точностью на более высокое. Таким образом, log.retention.ms будет иметь наивысший приоритет.
3.1. Основы
Во-первых, давайте проверим значение по умолчанию для хранения, выполнив команду grep из каталога Apache Kafka:
$ grep -i 'log.retention.[hms].*\=' config/server.properties
log.retention.hours=168
Здесь мы можем заметить, что время хранения по умолчанию составляет семь дней.
Чтобы сообщения сохранялись только в течение десяти минут, мы можем установить значение свойства log.retention.minutes в config/server.properties:
log.retention.minutes=10
3.2. Период хранения для новой темы
Пакет Apache Kafka содержит несколько сценариев оболочки, которые мы можем использовать для выполнения административных задач. Мы будем использовать их для создания вспомогательного скрипта functions.sh, который мы будем использовать в ходе этого руководства.
Давайте начнем с добавления двух функций в functions.sh для создания темы и описания ее конфигурации соответственно:
function create_topic {
topic_name="$1"
bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \
--partitions 1 --replication-factor 1 \
--zookeeper localhost:2181
}
function describe_topic_config {
topic_name="$1"
./bin/kafka-configs.sh --describe --all \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
Затем давайте создадим два автономных скрипта, create-topic.sh и get-topic-retention- time.sh:
bash-5.1# cat create-topic.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
create_topic "${topic_name}"
exit $?
bash-5.1# cat get-topic-retention-time.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
exit $?
Мы должны отметить, что description_topic_config выдаст все свойства, настроенные для темы. Итак, мы использовали однострочник awk, чтобы добавить фильтр для свойства Retention.ms.
bash-5.1# ./create-topic.sh test-topic
Created topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=600000
Наконец, давайте запустим среду Kafka и проверим конфигурацию периода хранения для нового примера темы: ). На самом деле это производное от свойства log.retention.minutes, которое мы ранее определили в файле server.properties.
4. Конфигурация на уровне темы
После запуска сервера брокера свойства log.retention.{hours|minutes|ms} на уровне сервера становятся доступными только для чтения. С другой стороны, мы получаем доступ к свойству Retention.ms, которое мы можем настроить на уровне темы.
Давайте добавим в наш скрипт functions.sh метод для настройки свойства темы:
function alter_topic_config {
topic_name="$1"
config_name="$2"
config_value="$3"
./bin/kafka-configs.sh --alter \
--add-config ${config_name}=${config_value} \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
Затем мы можем использовать это в скрипте alter-topic-config.sh:
#!/bin/sh
. ./functions.sh
alter_topic_retention_config $1 $2 $3
exit $?
Наконец, давайте установим время хранения на пять минут для тестовой темы и проверим то же самое:
bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
Completed updating config for topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=300000
5. Проверка
До сих пор мы видели, как мы можем настроить период хранения сообщения в пределах Тема Кафки. Пришло время убедиться, что срок действия сообщения действительно истекает после истечения срока хранения.
5.1. Producer-Consumer
Давайте добавим функции product_message и Consumer_message в файл functions.sh. Внутри они используют kafka-console-producer.sh и kafka-console-consumer.sh, соответственно, для создания/потребления сообщения:
function produce_message {
topic_name="$1"
message="$2"
echo "${message}" | ./bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
function consume_message {
topic_name="$1"
timeout="$2"
./bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning \
--topic ${topic_name} \
--max-messages 1 \
--timeout-ms $timeout
}
Мы должны отметить, что потребитель всегда читает сообщения с самого начала, как нам нужен потребитель, который читает любое доступное сообщение в Kafka.
Далее давайте создадим автономного производителя сообщений:
bash-5.1# cat producer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
message="$2"
produce_message ${topic_name} ${message}
exit $?
Наконец, давайте создадим автономного потребителя сообщений:
bash-5.1# cat consumer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
timeout="$2"
consume_message ${topic_name} $timeout
exit $?
5.2. Истечение срока действия сообщения
Теперь, когда у нас есть готовая базовая настройка, давайте создадим одно сообщение и мгновенно воспользуемся им дважды:
bash-5.1# ./producer.sh "test-topic-2" "message1"
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
«
«Итак, мы видим, что потребитель повторно использует любое доступное сообщение.
bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
[2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
Теперь давайте введем пятиминутную задержку ожидания, а затем попытаемся обработать сообщение:
Как и ожидалось, потребитель не нашел ни одного сообщения для обработки, потому что срок хранения сообщения истек.
6. Ограничения
Внутри Kafka Broker поддерживает другое свойство, называемое log.retention.check.interval.ms. Это свойство определяет частоту, с которой сообщения проверяются на истечение срока действия.
Таким образом, чтобы сохранить эффективность политики хранения, мы должны убедиться, что значение log.retention.check.interval.ms ниже, чем значение свойства Retention.ms для любой заданной темы.
7. Заключение