«1. Введение

Kinesis — это инструмент для сбора, обработки и анализа потоков данных в режиме реального времени, разработанный в Amazon. Одним из его основных преимуществ является то, что он помогает в разработке приложений, управляемых событиями.

В этом руководстве мы рассмотрим несколько библиотек, которые позволяют нашему приложению Spring создавать и использовать записи из Kinesis Stream. В примерах кода показаны основные функциональные возможности, но не представлен код, готовый к работе.

2. Предпосылки

Прежде чем двигаться дальше, нам нужно сделать две вещи.

Во-первых, нужно создать проект Spring, так как цель здесь — взаимодействовать с Kinesis из проекта Spring.

Второй — создать Kinesis Data Stream. Мы можем сделать это из веб-браузера в нашей учетной записи AWS. Одной из альтернатив для поклонников AWS CLI среди нас является использование командной строки. Поскольку мы будем взаимодействовать с ним из кода, у нас также должны быть учетные данные AWS IAM, ключ доступа и секретный ключ, а также регион.

Все наши производители будут создавать фиктивные записи IP-адресов, в то время как потребители будут читать эти значения и перечислять их в консоли приложения.

3. AWS SDK для Java

Самая первая библиотека, которую мы будем использовать, — это AWS SDK для Java. Его преимущество в том, что он позволяет нам управлять многими частями работы с Kinesis Data Streams. Мы можем читать данные, создавать данные, создавать потоки данных и повторно разделять потоки данных. Недостатком является то, что для того, чтобы иметь готовый к производству код, нам придется кодировать такие аспекты, как повторное разбиение, обработка ошибок или демон, чтобы поддерживать жизнь потребителя.

3.1. Зависимость Maven

Зависимость amazon-kinesis-client от Maven предоставит все необходимое для работы примеров. Теперь мы добавим его в наш файл pom.xml:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.11.2</version>
</dependency>

3.2. Spring Setup

Давайте повторно используем объект AmazonKinesis, необходимый для взаимодействия с нашим Kinesis Stream. Мы создадим его как @Bean внутри нашего класса @SpringBootApplication:

@Bean
public AmazonKinesis buildAmazonKinesis() {
    BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
    return AmazonKinesisClientBuilder.standard()
      .withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
      .withRegion(Regions.EU_CENTRAL_1)
      .build();
}

Далее определим aws.access.key и aws.secret.key, необходимые для локальной машины, в application.properties: ~ ~~

aws.access.key=my-aws-access-key-goes-here
aws.secret.key=my-aws-secret-key-goes-here

И мы будем читать их, используя аннотацию @Value:

@Value("${aws.access.key}")
private String accessKey;

@Value("${aws.secret.key}")
private String secretKey;

Для простоты мы будем полагаться на методы @Scheduled для создания и использования записей.

3.3. Потребитель

AWS SDK Kinesis Consumer использует модель извлечения, то есть наш код будет извлекать записи из сегментов потока данных Kinesis:

GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);

GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest);
while (!recordsResult.getRecords().isEmpty()) {
    recordsResult.getRecords().stream()
      .map(record -> new String(record.getData().array()))
      .forEach(System.out::println);

    recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
    recordsResult = kinesis.getRecords(recordsRequest);
}

Объект GetRecordsRequest создает запрос на потоковые данные. В нашем примере мы определили ограничение в 25 записей на запрос, и мы продолжаем чтение до тех пор, пока не останется ничего для чтения.

Мы также можем заметить, что для нашей итерации мы использовали объект GetShardIteratorResult. Мы создали этот объект внутри метода @PostConstruct, чтобы сразу начать отслеживать записи:

private GetShardIteratorResult shardIterator;

@PostConstruct
private void buildShardIterator() {
    GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest();
    readShardsRequest.setStreamName(IPS_STREAM);
    readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST);
    readShardsRequest.setShardId(IPS_SHARD_ID);

    this.shardIterator = kinesis.getShardIterator(readShardsRequest);
}

3.4. Producer

Давайте теперь посмотрим, как управлять созданием записей для нашего потока данных Kinesis.

Мы вставляем данные, используя объект PutRecordsRequest. Для этого нового объекта мы добавляем список, содержащий несколько объектов PutRecordsRequestEntry:

List<PutRecordsRequestEntry> entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {
    PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
    entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()));
    entry.setPartitionKey(IPS_PARTITION_KEY);
    return entry;
}).collect(Collectors.toList());

PutRecordsRequest createRecordsRequest = new PutRecordsRequest();
createRecordsRequest.setStreamName(IPS_STREAM);
createRecordsRequest.setRecords(entries);

kinesis.putRecords(createRecordsRequest);

Мы создали базового потребителя и производителя смоделированных IP-записей. Все, что осталось сделать сейчас, это запустить наш проект Spring и увидеть IP-адреса, перечисленные в консоли нашего приложения.

4. KCL и KPL

Клиентская библиотека Kinesis (KCL) — это библиотека, которая упрощает использование записей. Это также уровень абстракции API-интерфейсов AWS SDK Java для Kinesis Data Streams. За кулисами библиотека выполняет балансировку нагрузки между многими экземплярами, реагируя на сбои экземпляров, устанавливая контрольные точки обработанных записей и реагируя на повторное разбиение.

Kinesis Producer Library (KPL) — это библиотека, полезная для записи в поток данных Kinesis. Он также обеспечивает уровень абстракции, который находится над API Java SDK AWS для Kinesis Data Streams. Для повышения производительности библиотека автоматически обрабатывает пакетную обработку, многопоточность и логику повторных попыток.

KCL и KPL имеют основное преимущество в том, что они просты в использовании, поэтому мы можем сосредоточиться на создании и использовании записей.

4.1. Зависимости Maven

«При необходимости две библиотеки можно добавить в наш проект отдельно. Чтобы включить KPL и KCL в наш проект Maven, нам нужно обновить наш файл pom.xml:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-producer</artifactId>
    <version>0.13.1</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.11.2</version>
</dependency>

4.2. Настройка Spring

Единственная подготовка к Spring, которая нам нужна, — это убедиться, что у нас есть учетные данные IAM. Значения для aws.access.key и aws.secret.key определены в нашем файле application.properties, поэтому при необходимости мы можем прочитать их с помощью @Value.

4.3. Потребитель

Сначала мы создадим класс, реализующий интерфейс IRecordProcessor и определяющий нашу логику обработки записей потока данных Kinesis, то есть их вывод на консоль:

public class IpProcessor implements IRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) { }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        processRecordsInput.getRecords()
          .forEach(record -> System.out.println(new String(record.getData().array())));
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) { }
}

Следующий шаг — определить фабричный класс, который реализует интерфейс IRecordProcessorFactory и возвращает ранее созданный объект IpProcessor:

public class IpProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new IpProcessor();
    }
}

И теперь, в качестве последнего шага, мы будем использовать объект Worker для определения нашего конвейера-потребителя. Нам нужен объект KinesisClientLibConfiguration, который при необходимости будет определять учетные данные IAM и регион AWS.

Мы передадим KinesisClientLibConfiguration и наш объект IpProcessorFactory нашему Worker, а затем запустим его в отдельном потоке. Мы постоянно поддерживаем эту логику использования записей с помощью класса Worker, поэтому теперь мы постоянно читаем новые записи:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
  APP_NAME, 
  IPS_STREAM,
  new AWSStaticCredentialsProvider(awsCredentials), 
  IPS_WORKER)
    .withRegionName(Regions.EU_CENTRAL_1.getName());

final Worker worker = new Worker.Builder()
  .recordProcessorFactory(new IpProcessorFactory())
  .config(consumerConfig)
  .build();
CompletableFuture.runAsync(worker.run());

4.4. Producer

Давайте теперь определим объект KinesisProducerConfiguration, добавив учетные данные IAM и регион AWS:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration()
  .setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
  .setVerifyCertificate(false)
  .setRegion(Regions.EU_CENTRAL_1.getName());

this.kinesisProducer = new KinesisProducer(producerConfig);

Мы включим объект kinesisProducer, ранее созданный в задание @Scheduled, и будем непрерывно создавать записи для нашего потока данных Kinesis:

IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()))
  .forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry));

5. Spring Cloud Stream Binder Kinesis

Мы уже видели две библиотеки, обе созданы вне экосистемы Spring. Теперь мы увидим, как Spring Cloud Stream Binder Kinesis может еще больше упростить нашу жизнь, используя Spring Cloud Stream.

5.1. Зависимость Maven

Зависимость Maven, которую нам нужно определить в нашем приложении для Spring Cloud Stream Binder Kinesis:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
    <version>1.2.1.RELEASE</version>
</dependency>

5.2. Spring Setup

При работе на EC2 необходимые свойства AWS обнаруживаются автоматически, поэтому нет необходимости их определять. Поскольку мы запускаем наши примеры на локальном компьютере, нам нужно определить наш ключ доступа к IAM, секретный ключ и регион для нашей учетной записи AWS. Мы также отключили автоматическое определение имени стека CloudFormation для приложения:

cloud.aws.credentials.access-key=my-aws-access-key
cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false

Spring Cloud Stream поставляется с тремя интерфейсами, которые мы можем использовать в привязке потока: используется для публикации записей Процессор представляет собой комбинацию обоих

    Мы также можем определить наши собственные интерфейсы, если нам это нужно.

5.3. Потребитель

Определение потребителя состоит из двух частей. Во-первых, мы определим в application.properties поток данных, из которого мы будем потреблять:

А затем давайте определим класс Spring @Component. Аннотация @EnableBinding(Sink.class) позволит нам читать из потока Kinesis с помощью метода, аннотированного с помощью @StreamListener(Sink.INPUT):

spring.cloud.stream.bindings.input.destination=live-ips
spring.cloud.stream.bindings.input.group=live-ips-group
spring.cloud.stream.bindings.input.content-type=text/plain

5.4. Производитель

@EnableBinding(Sink.class)
public class IpConsumer {

    @StreamListener(Sink.INPUT)
    public void consume(String ip) {
        System.out.println(ip);
    }
}

Производитель также может быть разделен на две части. Во-первых, мы должны определить свойства нашего потока внутри application.properties:

Затем мы добавляем @EnableBinding(Source.class) в Spring @Component и создаем новые тестовые сообщения каждые несколько секунд:

spring.cloud.stream.bindings.output.destination=live-ips
spring.cloud.stream.bindings.output.content-type=text/plain

~~ ~ Это все, что нам нужно для работы Spring Cloud Stream Binder Kinesis. Мы можем просто запустить приложение прямо сейчас.

@Component
@EnableBinding(Source.class)
public class IpProducer {

    @Autowired
    private Source source;

    @Scheduled(fixedDelay = 3000L)
    private void produce() {
        IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix)
          .forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build()));
    }
}

6. Заключение

В этой статье мы рассмотрели, как интегрировать наш проект Spring с двумя библиотеками AWS для взаимодействия с Kinesis Data Stream. Мы также увидели, как использовать библиотеку Spring Cloud Stream Binder Kinesis, чтобы сделать реализацию еще проще.

Исходный код этой статьи можно найти на Github.

«