«1. Введение

Apache Kafka — это платформа для обмена сообщениями. С его помощью мы можем обмениваться данными между различными приложениями в любом масштабе.

Spring Cloud Stream — это платформа для создания приложений, управляемых сообщениями. Это может упростить интеграцию Kafka в наши сервисы.

Обычно Kafka используется с форматом сообщений Avro, поддерживаемым реестром схем. В этом руководстве мы будем использовать реестр Confluent Schema. Мы попробуем реализацию интеграции Spring с реестром Confluent Schema Registry, а также собственные библиотеки Confluent.

2. Реестр объединенных схем

Kafka представляет все данные в виде байтов, поэтому обычно используется внешняя схема, а также сериализация и десериализация в байты в соответствии с этой схемой. Вместо того, чтобы предоставлять копию этой схемы с каждым сообщением, что было бы дорогостоящим накладным расходом, также принято хранить схему в реестре и предоставлять только идентификатор с каждым сообщением.

Confluent Schema Registry предоставляет простой способ хранения, извлечения и управления схемами. Он предоставляет несколько полезных RESTful API.

Схемы хранятся по субъектам, и по умолчанию реестр выполняет проверку совместимости, прежде чем разрешить загрузку новой схемы для субъекта.

Каждый производитель будет знать схему, которую он создает, и каждый потребитель должен иметь возможность либо потреблять данные в ЛЮБОМ формате, либо должен иметь определенную схему, которую он предпочитает считывать. Производитель консультируется с реестром, чтобы установить правильный идентификатор для использования. при отправке сообщения. Потребитель использует реестр для получения схемы отправителя.

Когда потребитель знает как схему отправителя, так и желаемый формат сообщения, библиотека Avro может преобразовать данные в нужный потребителю формат.

3. Apache Avro

Apache Avro — это система сериализации данных.

Он использует структуру JSON для определения схемы, обеспечивая сериализацию между байтами и структурированными данными.

Одной из сильных сторон Avro является поддержка преобразования сообщений, написанных в одной версии схемы, в формат, определяемый совместимой альтернативной схемой.

Набор инструментов Avro также может генерировать классы для представления структур данных этих схем, упрощая сериализацию в POJO и из них.

4. Настройка проекта

Чтобы использовать реестр схемы с Spring Cloud Stream, нам потребуются зависимости Spring Cloud Kafka Binder и реестра схемы Maven:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-schema</artifactId>
</dependency>

Для сериализатора Confluent нам потребуется: ~~ ~

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>4.0.0</version>
</dependency>

Сериализатор Confluent находится в их репозитории:

<repositories>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>

Кроме того, давайте воспользуемся плагином Maven для создания классов Avro:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.8.2</version>
            <executions>
                <execution>
                    <id>schemas</id>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                        <goal>protocol</goal>
                        <goal>idl-protocol</goal>
                    </goals>
                    <configuration>                        
                        <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
                        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Для тестирования мы можем использовать существующую Kafka и Schema Настройка реестра или использование докеризированных Confluent и Kafka.

5. Spring Cloud Stream

Теперь, когда мы настроили наш проект, давайте напишем производителя с использованием Spring Cloud Stream. Он будет публиковать сведения о сотрудниках по теме.

Затем мы создадим потребителя, который будет считывать события из темы и записывать их в оператор журнала.

5.1. Схема

Во-первых, давайте определим схему для сведений о сотрудниках. Мы можем назвать его employee-schema.avsc.

Мы можем сохранить файл схемы в src/main/resources:

{
    "type": "record",
    "name": "Employee",
    "namespace": "com.baeldung.schema",
    "fields": [
    {
        "name": "id",
        "type": "int"
    },
    {
        "name": "firstName",
        "type": "string"
    },
    {
        "name": "lastName",
        "type": "string"
    }]
}

После создания приведенной выше схемы нам нужно собрать проект. Затем генератор кода Apache Avro создаст POJO с именем Employee в пакете com.baeldung.schema.

5.2. Производитель

Spring Cloud Stream предоставляет интерфейс процессора. Это дает нам выходной и входной канал.

Давайте воспользуемся этим, чтобы создать производителя, который отправляет объекты Employee в топик с информацией о сотрудниках Kafka:

@Autowired
private Processor processor;

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

    // creating employee details
    Employee employee = new Employee();
    employee.setId(empId);
    employee.setFirstName(firstName);
    employee.setLastName(lastName);

    Message<Employee> message = MessageBuilder.withPayload(employee)
                .build();

    processor.output()
        .send(message);
}

5.2. Потребитель

Теперь давайте напишем нашего потребителя:

@StreamListener(Processor.INPUT)
public void consumeEmployeeDetails(Employee employeeDetails) {
    logger.info("Let's process employee details: {}", employeeDetails);
}

Этот потребитель будет читать события, опубликованные в теме сведений о сотрудниках. Давайте направим его вывод в журнал, чтобы посмотреть, что он делает.

5.3. Kafka Bindings

До сих пор мы работали только с входными и выходными каналами нашего объекта Processor. Эти каналы необходимо настроить с правильными пунктами назначения.

Давайте используем application.yml для предоставления привязок Kafka:

spring:
  cloud:
    stream: 
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro

«

«Отметим, что в данном случае под пунктом назначения понимается тема Кафки. Может немного сбивать с толку то, что это называется назначением, поскольку в данном случае это источник входных данных, но это согласованный термин для потребителей и производителей.

5.4. Точка входа

@Autowired
private AvroProducer avroProducer;

@PostMapping("/employees/{id}/{firstName}/{lastName}")
public String producerAvroMessage(@PathVariable int id, @PathVariable String firstName, 
  @PathVariable String lastName) {
    avroProducer.produceEmployeeDetails(id, firstName, lastName);
    return "Sent employee details to consumer";
}

Теперь, когда у нас есть производитель и потребитель, давайте представим API для приема входных данных от пользователя и передачи их производителю:

5.5. Включите реестр Confluent Schema и привязки

@SpringBootApplication
@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
public class AvroKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(AvroKafkaApplication.class, args);
    }

}

Наконец, чтобы наше приложение применяло привязки реестра Kafka и схемы, нам нужно добавить @EnableBinding и @EnableSchemaRegistryClient в один из наших классов конфигурации:

@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}")
private String endPoint;

@Bean
public SchemaRegistryClient schemaRegistryClient() {
    ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
    client.setEndpoint(endPoint);
    return client;
}

И мы должны предоставить bean-компонент ConfluentSchemaRegistryClient:

EndPoint — это URL-адрес реестра Confluent Schema.

5.6. Тестирование нашего сервиса

curl -X POST localhost:8080/employees/1001/Harry/Potter

Давайте протестируем сервис с помощью POST-запроса:

2019-06-11 18:45:45.343  INFO 17036 --- [container-0-C-1] com.baeldung.consumer.AvroConsumer       : Let's process employee details: {"id": 1001, "firstName": "Harry", "lastName": "Potter"}

Логи сообщают нам, что это сработало:

5.7. Что произошло во время обработки?

  1. The producer built the Kafka message using the Employee object
  2. The producer registered the employee schema with the schema registry to get a schema version ID, this either creates a new ID or reuses the existing one for that exact schema
  3. Avro serialized the Employee object using the schema
  4. Spring Cloud put the schema-id in the message headers
  5. The message was published on the topic
  6. When the message came to the consumer, it read the schema-id from the header
  7. The consumer used schema-id to get the Employee schema from the registry
  8. The consumer found a local class that could represent that object and deserialized the message into it

Давайте попробуем понять, что именно произошло с нашим примером приложения:

6. Сериализация/десериализация с использованием собственных библиотек Kafka

Spring Boot предоставляет несколько готовых преобразователей сообщений. По умолчанию Spring Boot использует заголовок Content-Type для выбора соответствующего преобразователя сообщений.

В нашем примере Content-Type — это application/*+avro, поэтому он использовал AvroSchemaMessageConverter для чтения и записи форматов Avro. Но Confluent рекомендует использовать KafkaAvroSerializer и KafkaAvroDeserializer для преобразования сообщений.

Хотя собственный формат Spring работает хорошо, у него есть некоторые недостатки с точки зрения разделения, и он не совместим со стандартами Confluent, что может потребоваться некоторым службам, отличным от Spring, в нашем экземпляре Kafka.

spring:
  cloud:
    stream:
      default: 
        producer: 
          useNativeEncoding: true
        consumer:  
          useNativeEncoding: true     
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro
      kafka:
         binder:        
           producer-properties:
             key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             schema.registry.url: http://localhost:8081 
           consumer-properties:
             key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             schema.registry.url: http://localhost:8081
             specific.avro.reader: true

Давайте обновим наш application.yml, чтобы использовать преобразователи Confluent:

Мы включили useNativeEncoding. Это заставляет Spring Cloud Stream делегировать сериализацию предоставленным классам.

Мы также должны знать, как мы можем предоставить собственные свойства настроек для Kafka в Spring Cloud, используя kafka.binder.producer-properties и kafka.binder.consumer-properties.

7. Группы потребителей и разделы

Группы потребителей — это набор потребителей, принадлежащих одному и тому же приложению. Потребители из одной и той же группы потребителей имеют одно и то же имя группы.

spring:
  cloud:
    stream:
      // ...     
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
      // ...

Обновим application.yml, чтобы добавить имя группы потребителей:

Все потребители равномерно распределяют разделы темы между собой. Сообщения в разных разделах будут обрабатываться параллельно.

В группе потребителей максимальное количество потребителей, одновременно читающих сообщения, равно количеству разделов. Таким образом, мы можем настроить количество разделов и потребителей, чтобы получить желаемый параллелизм. В общем случае у нас должно быть больше разделов, чем общее количество потребителей во всех репликах нашего сервиса.

7.1. Ключ раздела

При обработке наших сообщений порядок их обработки может быть важен. Когда наши сообщения обрабатываются параллельно, последовательность обработки будет трудно контролировать.

Kafka предоставляет правило, согласно которому в данном разделе сообщения всегда обрабатываются в той последовательности, в которой они прибыли. Итак, если важно, чтобы определенные сообщения обрабатывались в правильном порядке, мы гарантируем, что они попадут в один и тот же раздел.

Мы можем предоставить ключ раздела при отправке сообщения в тему. Сообщения с одним и тем же ключом раздела всегда будут отправляться в один и тот же раздел. Если ключ раздела отсутствует, сообщения будут разбиты на разделы в циклическом режиме.

Давайте попробуем понять это на примере. Представьте, что мы получаем несколько сообщений для сотрудника и хотим обработать все сообщения сотрудника в последовательности. Название отдела и идентификатор сотрудника могут однозначно идентифицировать сотрудника.

{
    "type": "record",
    "name": "EmployeeKey",
    "namespace": "com.baeldung.schema",
    "fields": [
     {
        "name": "id",
        "type": "int"
    },
    {
        "name": "departmentName",
        "type": "string"
    }]
}

Итак, давайте определим ключ раздела с идентификатором сотрудника и названием отдела:

После сборки проекта POJO EmployeeKey будет сгенерирован в пакете com.baeldung.schema.

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

    // creating employee details
    Employee employee = new Employee();
    employee.setId(empId);
    // ...

    // creating partition key for kafka topic
    EmployeeKey employeeKey = new EmployeeKey();
    employeeKey.setId(empId);
    employeeKey.setDepartmentName("IT");

    Message<Employee> message = MessageBuilder.withPayload(employee)
        .setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey)
        .build();

    processor.output()
        .send(message);
}

«Давайте изменим нашего производителя, чтобы он использовал EmployeeKey в качестве ключа раздела:

Здесь мы помещаем ключ раздела в заголовок сообщения.

Теперь тот же раздел будет получать сообщения с тем же идентификатором сотрудника и названием отдела.

7.2 Параллелизм потребителей

spring:
  cloud:
    stream:
      // ... 
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
          concurrency: 3

Spring Cloud Stream позволяет нам установить параллелизм для потребителя в application.yml:

Теперь наши потребители будут одновременно читать три сообщения из темы. Другими словами, Spring создаст три разных потока для независимого использования.

8. Заключение

В этой статье мы интегрировали производителя и потребителя с Apache Kafka со схемами Avro и реестром Confluent Schema Registry.

Мы сделали это в одном приложении, но производитель и потребитель могли быть развернуты в разных приложениях и могли иметь свои собственные версии схем, которые синхронизировались через реестр.

Мы рассмотрели, как использовать реализацию клиента Avro и Schema Registry в Spring, а затем увидели, как переключиться на стандартную реализацию сериализации и десериализации Confluent для целей взаимодействия.

Наконец, мы рассмотрели, как разделить нашу тему и убедиться, что у нас есть правильные ключи сообщений, чтобы обеспечить безопасную параллельную обработку наших сообщений.