«1. Введение

Spring Cloud Data Flow — это облачная модель программирования и эксплуатации для компонуемых микросервисов данных.

С помощью Spring Cloud Data Flow разработчики могут создавать и организовывать конвейеры данных для распространенных случаев использования, таких как прием данных, аналитика в реальном времени и импорт/экспорт данных.

Эти конвейеры данных бывают двух видов: конвейеры потоковых и пакетных данных.

В первом случае неограниченное количество данных потребляется или создается с помощью промежуточного ПО для обмена сообщениями. В то время как во втором случае кратковременная задача обрабатывает конечный набор данных и затем завершается.

Эта статья будет посвящена потоковой обработке.

2. Обзор архитектуры

Ключевыми компонентами архитектуры этого типа являются приложения, сервер потока данных и целевая среда выполнения.

Также в дополнение к этим ключевым компонентам в архитектуре обычно есть оболочка потока данных и брокер сообщений.

Давайте рассмотрим все эти компоненты более подробно.

2.1. Приложения

Как правило, конвейер потоковой передачи данных включает в себя потребление событий из внешних систем, обработку данных и многоязычное сохранение. Эти фазы обычно называются Источником, Процессором и Приемником в терминологии Spring Cloud:

    Источник: приложение, которое использует события. следующее приложение в конвейере. Приемник: либо получает из источника, либо из процессора и записывает данные на желаемый уровень сохраняемости.

Эти приложения можно упаковать двумя способами:

  • Spring Boot uber-jar that is hosted in a maven repository, file, http or any other Spring resource implementation (this method will be used in this article)
  • Docker

Many sources, processor, and sink applications for common use-cases (e.g. jdbc, hdfs, http, router) are already provided and ready to use by the Spring Cloud Data Flow team.

2.2. Runtime

Also, a runtime is needed for these applications to execute. The supported runtimes are:

  • Cloud Foundry
  • Apache YARN
  • Kubernetes
  • Apache Mesos
  • Local Server for development (wich will be used in this article)

2.3. Data Flow Server

The component that is responsible for deploying applications to a runtime is the Data Flow Server. There is a Data Flow Server executable jar provided for each of the target runtimes.

The Data Flow Server is responsible for interpreting:

  • A stream DSL that describes the logical flow of data through multiple applications.
  • A deployment manifest that describes the mapping of applications onto the runtime.

2.4. Data Flow Shell

The Data Flow Shell is a client for the Data Flow Server. The shell allows us to perform the DSL command needed to interact with the server.

As an example, the DSL to describe the flow of data from an http source to a jdbc sink would be written as “http | jdbc”. These names in the DSL are registered with the Data Flow Server and map onto application artifacts that can be hosted in Maven or Docker repositories.

Spring also offer a graphical interface, named Flo, for creating and monitoring streaming data pipelines. However, its use is outside the discussion of this article.

2.5. Message Broker

As we’ve seen in the example of the previous section, we have used the pipe symbol into the definition of the flow of data. The pipe symbol represents the communication between the two applications via messaging middleware.

This means that we need a message broker up and running in the target environment.

The two messaging middleware brokers that are supported are:

  • Apache Kafka
  • RabbitMQ

And so, now that we have an overview of the architectural components – it’s time to build our first stream processing pipeline.

3. Установить брокер сообщений

Как мы видно, что приложениям в конвейере для связи требуется промежуточное программное обеспечение для обмена сообщениями. Для целей этой статьи мы будем использовать RabbitMQ.

Для получения полной информации об установке вы можете следовать инструкции на официальном сайте.

4. Локальный сервер потока данных

Чтобы ускорить процесс создания наших приложений, мы будем использовать Spring Initializr; с его помощью мы можем получить наши приложения Spring Boot за несколько минут.

После перехода на веб-сайт просто выберите группу и имя артефакта.

Как только это будет сделано, нажмите кнопку «Создать проект», чтобы начать загрузку артефакта Maven.

После завершения загрузки разархивируйте проект и импортируйте его как проект Maven в выбранную вами IDE.

Давайте добавим в проект зависимость от Maven. Поскольку нам понадобятся библиотеки Dataflow Local Server, давайте добавим зависимость spring-cloud-starter-dataflow-server-local:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
</dependency>

Теперь нам нужно аннотировать основной класс Spring Boot аннотацией @EnableDataFlowServer:

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowServerApplication.class, args);
    }
}

Вот и все. Наш локальный сервер потока данных готов к выполнению:

mvn spring-boot:run

Приложение загрузится через порт 9393.

5. Оболочка потока данных

Снова перейдите в Spring Initializr и выберите группу и Название артефакта.

После того, как мы загрузили и импортировали проект, давайте добавим зависимость spring-cloud-dataflow-shell:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dataflow-shell</artifactId>
</dependency>

Теперь нам нужно добавить аннотацию @EnableDataFlowShell к основному классу Spring Boot:

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

Теперь мы можем запустить оболочку:

mvn spring-boot:run

После запуска оболочки мы можем ввести команду help в приглашении, чтобы увидеть полный список команд, которые мы можем выполнить.

6. Исходное приложение

Аналогично, в Initializr мы создадим простое приложение и добавим зависимость Stream Rabbit с именем spring-cloud-starter-stream-rabbit:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

Мы затем добавьте аннотацию @EnableBinding(Source.class) к основному классу Spring Boot:

@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowTimeSourceApplication.class, args);
    }
}

Теперь нам нужно определить источник данных, которые должны быть обработаны. Этим источником может быть любая потенциально бесконечная рабочая нагрузка (данные датчиков Интернета вещей, круглосуточная обработка событий, прием данных онлайн-транзакций).

В нашем примере приложения мы создаем одно событие (для простоты — новую метку времени) каждые 10 секунд с помощью опросчика.

Аннотация @InboundChannelAdapter отправляет сообщение в выходной канал источника, используя возвращаемое значение в качестве полезной нагрузки сообщения:

@Bean
@InboundChannelAdapter(
  value = Source.OUTPUT, 
  poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
    return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}

«

«Наш источник данных готов.

7. Приложение процессора

Далее мы создадим приложение и добавим зависимость Stream Rabbit.

@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowTimeProcessorApplication.class, args);
    }
}

Затем мы добавим аннотацию @EnableBinding(Processor.class) к основному классу Spring Boot:

Далее нам нужно определить метод для обработки данных, поступающих из исходного приложения.

@Transformer(inputChannel = Processor.INPUT, 
  outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {

    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
    String date = dateFormat.format(timestamp);
    return date;
}

Чтобы определить преобразователь, нам нужно аннотировать этот метод аннотацией @Transformer:

Он преобразует метку времени из «входного» канала в отформатированную дату, которая будет отправлена ​​в « выходной канал.

8. Приложение Sink

Последним приложением, которое необходимо создать, является приложение Sink.

Снова перейдите в Spring Initializr и выберите группу, имя артефакта. После загрузки проекта добавим зависимость Stream Rabbit.

@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {

    public static void main(String[] args) {
	SpringApplication.run(
          SpringDataFlowLoggingSinkApplication.class, args);
    }
}

Затем добавьте аннотацию @EnableBinding(Sink.class) к основному классу Spring Boot:

Теперь нам нужен метод для перехвата сообщений, поступающих от процессорного приложения.

@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
    logger.info("Received: " + date);
}

Для этого нам нужно добавить аннотацию @StreamListener(Sink.INPUT) к нашему методу:

Метод просто печатает отметку времени, преобразованную в отформатированную дату, в файл журнала.

9. Зарегистрируйте потоковое приложение

Spring Cloud Data Flow Shell позволяет нам зарегистрировать потоковое приложение в реестре приложений с помощью команды app register.

Мы должны предоставить уникальное имя, тип приложения и URI, которые могут быть разрешены для артефакта приложения. В качестве типа укажите «источник», «процессор» или «приемник».

maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>

При предоставлении URI со схемой maven формат должен соответствовать следующему:

app register --name time-source --type source 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT

app register --name time-processor --type processor 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT

app register --name logging-sink --type sink 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT

Чтобы зарегистрировать ранее созданные приложения источника, процессора и приемника, перейдите в Spring Cloud Data Flow Shell и выполните следующее команды из командной строки:

10. Создайте и разверните поток

stream create --name time-to-log 
  --definition 'time-source | time-processor | logging-sink'

Чтобы создать новое определение потока, перейдите в оболочку Spring Cloud Data Flow Shell и выполните следующую команду оболочки:

Это определяет поток с именем time-to-log на основе выражения DSL «источник времени | процессор времени | журнал-приемник».

stream deploy --name time-to-log

Затем, чтобы развернуть поток, выполните следующую команду оболочки:

Сервер потока данных разрешает источник времени, процессор времени и приемник регистрации в координаты maven и использует их для запуска источника времени, приложения потока, обрабатывающие время и регистрирующие стоки.

2016-08-24 12:29:10.516  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink
2016-08-24 12:29:17.600  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-processor instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor
2016-08-24 12:29:23.280  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-source instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

Если поток развернут правильно, вы увидите в журналах Data Flow Server, что модули были запущены и связаны вместе:

11. Проверка результата

В этом примере источник просто отправляет текущую временную метку в виде сообщения каждую секунду, процессор форматирует ее, а приемник журнала выводит отформатированную временную метку, используя структуру ведения журнала.

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log
2016-08-24 12:40:42.029  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

Файлы журнала расположены в каталоге, отображаемом в выходных данных журнала Data Flow Server, как показано выше. Чтобы увидеть результат, мы можем просмотреть журнал:

12. Заключение

В этой статье мы увидели, как построить конвейер данных для потоковой обработки с помощью Spring Cloud Data Flow.

Кроме того, мы увидели роль приложений источника, процессора и приемника внутри потока и то, как подключить и связать этот модуль с сервером потока данных с помощью оболочки потока данных.