«1. Обзор

Spring Cloud Data Flow — это облачный набор инструментов для создания конвейеров данных в реальном времени и пакетных процессов. Spring Cloud Data Flow готов к использованию в ряде случаев обработки данных, таких как простой импорт/экспорт, обработка ETL, потоковая передача событий и прогнозная аналитика.

В этом руководстве мы рассмотрим пример извлечения, преобразования и загрузки (ETL) в реальном времени с использованием потокового конвейера, который извлекает данные из базы данных JDBC, преобразует их в простые POJO и загружает их в MongoDB.

2. ETL и обработка потока событий

ETL — извлечение, преобразование и загрузка — обычно называют процессом пакетной загрузки данных из нескольких баз данных и систем в общее хранилище данных. В этом хранилище данных можно выполнять интенсивную обработку данных без ущерба для общей производительности системы.

Однако новые тенденции меняют то, как это делается. ETL по-прежнему играет роль в передаче данных в хранилища данных и озера данных.

В настоящее время это можно сделать с потоками в архитектуре потока событий с помощью Spring Cloud Data Flow.

3. Spring Cloud Data Flow

Spring Cloud Data Flow (SCDF) позволяет разработчикам создавать конвейеры данных двух видов:

    Долгоживущие потоковые приложения реального времени с использованием Spring Cloud Stream Кратковременная пакетная задача приложения, использующие Spring Cloud Task

В этой статье мы рассмотрим первое долгоживущее потоковое приложение, основанное на Spring Cloud Stream.

3.1. Приложения Spring Cloud Stream

Конвейеры SCDF Stream состоят из шагов, где каждый шаг представляет собой приложение, созданное в стиле Spring Boot с использованием микрофреймворка Spring Cloud Stream. Эти приложения интегрируются промежуточным программным обеспечением для обмена сообщениями, таким как Apache Kafka или RabbitMQ.

Эти приложения подразделяются на источники, процессоры и приемники. По сравнению с процессом ETL можно сказать, что источник — это «извлечение», процессор — это «преобразователь», а приемник — это «загрузочная» часть.

В некоторых случаях мы можем использовать стартер приложения на одном или нескольких этапах конвейера. Это означает, что нам не нужно реализовывать новое приложение для шага, а вместо этого настраивать уже реализованный стартер существующего приложения.

Список стартеров приложений можно найти здесь.

3.2. Spring Cloud Data Flow Server

Последняя часть архитектуры — Spring Cloud Data Flow Server. Сервер SCDF выполняет развертывание приложений и конвейерного потока с использованием спецификации Spring Cloud Deployer. Эта спецификация поддерживает облачный вариант SCDF за счет развертывания в ряде современных сред выполнения, таких как Kubernetes, Apache Mesos, Yarn и Cloud Foundry.

Кроме того, мы можем запустить поток как локальное развертывание.

Более подробную информацию об архитектуре SCDF можно найти здесь.

4. Настройка среды

Прежде чем мы начнем, нам нужно выбрать части этого сложного развертывания. Первым элементом, который необходимо определить, является сервер SCDF.

Для тестирования мы будем использовать SCDF Server Local для локальной разработки. Для производственного развертывания позже мы можем выбрать облачную среду выполнения, например SCDF Server Kubernetes. Мы можем найти список сред выполнения сервера здесь.

Теперь давайте проверим системные требования для запуска этого сервера.

4.1. Системные требования

Чтобы запустить сервер SCDF, нам нужно определить и настроить две зависимости:

    промежуточное ПО для обмена сообщениями и РСУБД.

В качестве промежуточного ПО для обмена сообщениями мы будем работать с RabbitMQ и выберем PostgreSQL в качестве СУБД для хранения определений наших конвейерных потоков.

Для запуска RabbitMQ загрузите последнюю версию здесь и запустите экземпляр RabbitMQ, используя конфигурацию по умолчанию, или выполните следующую команду Docker:

docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

В качестве последнего шага установки установите и запустите СУБД PostgreSQL на порту по умолчанию. 5432. После этого создайте базу данных, в которой SCDF может хранить свои определения потоков, используя следующий сценарий:

CREATE DATABASE dataflow;

4.2. Локальный сервер Spring Cloud Data Flow

«Для запуска локального сервера SCDF мы можем запустить сервер с помощью docker-compose или запустить его как приложение Java.

Здесь мы запустим SCDF Server Local как Java-приложение. Для настройки приложения мы должны определить конфигурацию как параметры приложения Java. Нам понадобится Java 8 в системном пути.

Чтобы разместить файлы jar и зависимости, нам нужно создать домашнюю папку для нашего сервера SCDF и загрузить в эту папку локальный дистрибутив сервера SCDF. Вы можете скачать самый последний дистрибутив SCDF Server Local здесь.

Также нам нужно создать папку lib и поместить туда драйвер JDBC. Последняя версия драйвера PostgreSQL доступна здесь.

Наконец, давайте запустим локальный сервер SCDF:

$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \
    --spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \
    --spring.datasource.username=postgres_username \
    --spring.datasource.password=postgres_password \
    --spring.datasource.driver-class-name=org.postgresql.Driver \
    --spring.rabbitmq.host=127.0.0.1 \
    --spring.rabbitmq.port=5672 \
    --spring.rabbitmq.username=guest \
    --spring.rabbitmq.password=guest

Мы можем проверить, работает ли он, взглянув на этот URL:

http://localhost:9393/dashboard

4.3. Spring Cloud Data Flow Shell

Оболочка SCDF — это инструмент командной строки, который упрощает создание и развертывание наших приложений и конвейеров. Эти команды оболочки выполняются через REST API Spring Cloud Data Flow Server.

Загрузите последнюю версию jar-файла в домашнюю папку SCDF, доступную здесь. Как только это будет сделано, выполните следующую команду (при необходимости обновите версию):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>

Если вместо «dataflow:\u003e» вы получите «server-unknown:\u003e» в последней строке, вы не запускают сервер SCDF на локальном хосте. В этом случае выполните следующую команду для подключения к другому хосту:

server-unknown:>dataflow config server http://{host}

Теперь Shell подключена к серверу SCDF, и мы можем выполнять наши команды.

Первое, что нам нужно сделать в Shell, это импортировать стартеры приложений. Найдите здесь последнюю версию RabbitMQ+Maven в Spring Boot 2.0.x и выполните следующую команду (снова обновите версию, здесь «Darwin-SR1», если необходимо):

$ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven

Для проверки установленных приложений выполните следующую команду Shell:

$ dataflow:> app list

В результате мы должны увидеть таблицу, содержащую все установленные приложения.

Кроме того, SCDF предлагает графический интерфейс Flo, к которому мы можем получить доступ по этому адресу: http://localhost:9393/dashboard. Однако его использование выходит за рамки этой статьи.

5. Составление конвейера ETL

Давайте теперь создадим наш потоковый конвейер. Для этого мы будем использовать стартер приложения JDBC Source для извлечения информации из нашей реляционной базы данных.

Кроме того, мы создадим собственный процессор для преобразования информационной структуры и собственный приемник для загрузки наших данных в MongoDB.

5.1. Извлечение — подготовка реляционной базы данных к извлечению

Давайте создадим базу данных с именем crm и таблицу с именем клиента:

CREATE DATABASE crm;
CREATE TABLE customer (
    id bigint NOT NULL,
    imported boolean DEFAULT false,
    customer_name character varying(50),
    PRIMARY KEY(id)
)

Обратите внимание, что мы используем импортированный флаг, который будет хранить, какая запись уже была импортирована. Мы также можем хранить эту информацию в другой таблице, если это необходимо.

INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);

Теперь давайте вставим некоторые данные:

5.2. Преобразование — сопоставление полей JDBC со структурой полей MongoDB

На этапе преобразования мы выполним простой перевод поля customer_name из исходной таблицы в новое имя поля. Здесь можно сделать и другие преобразования, но давайте не будем сокращать пример.

Для этого мы создадим новый проект с именем customer-transform. Самый простой способ сделать это — использовать сайт Spring Initializr для создания проекта. После перехода на веб-сайт выберите группу и имя артефакта. Мы будем использовать com.customer и customer-transform соответственно.

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

Как только это будет сделано, нажмите кнопку «Создать проект», чтобы загрузить проект. Затем разархивируйте проект и импортируйте его в свою любимую среду IDE, а затем добавьте следующую зависимость в файл pom.xml:

Теперь мы готовы приступить к кодированию преобразования имени поля. Для этого мы создадим класс Customer, который будет действовать как адаптер. Этот класс получит имя клиента с помощью метода setName() и выведет его значение с помощью метода getName.

public class Customer {

    private Long id;

    private String name;

    @JsonProperty("customer_name")
    public void setName(String name) {
        this.name = name;
    }

    @JsonProperty("name")
    public String getName() {
        return name;
    }

    // Getters and Setters
}

Аннотации @JsonProperty будут выполнять преобразование при десериализации из JSON в Java:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;

@EnableBinding(Processor.class)
public class CustomerProcessorConfiguration {

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Customer convertToPojo(Customer payload) {

        return payload;
    }
}

«

«Процессор должен получить данные от входа, выполнить преобразование и связать результат с выходным каналом. Давайте создадим для этого класс:

В приведенном выше коде мы видим, что преобразование происходит автоматически. На вход поступают данные в формате JSON, и Джексон десериализует их в объект Customer, используя заданные методы.

Для вывода наоборот, данные сериализуются в JSON с использованием методов get.

5.3. Загрузка — приемник в MongoDB

Аналогично шагу преобразования мы создадим еще один проект maven, теперь с именем customer-mongodb-sink. Снова откройте Spring Initializr, для группы выберите com.customer, а для артефакта выберите customer-mongodb-sink. Затем введите «MongoDB» в поле поиска зависимостей и загрузите проект.

Затем разархивируйте и импортируйте в свою любимую IDE.

import org.springframework.data.mongodb.core.mapping.Document;

@Document(collection="customer")
public class Customer {

    private Long id;
    private String name;

    // Getters and Setters
}

Затем добавьте ту же дополнительную зависимость, что и в проекте преобразования клиента.

@EnableBinding(Sink.class)
public class CustomerListener {

    @Autowired
    private CustomerRepository repository;

    @StreamListener(Sink.INPUT)
    public void save(Customer customer) {
        repository.save(customer);
    }
}

Теперь мы создадим еще один класс Customer для получения входных данных на этом шаге:

import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface CustomerRepository extends MongoRepository<Customer, Long> {

}

Для погружения Customer мы создадим класс Listener, который будет сохранять сущность клиента с помощью CustomerRepository:

А CustomerRepository в данном случае — это MongoRepository из Spring Data:

5.4. Определение потока

app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT

Теперь оба пользовательских приложения готовы к регистрации на сервере SCDF. Для этого скомпилируйте оба проекта с помощью команды Maven mvn install.

app list

Затем мы регистрируем их с помощью Spring Cloud Data Flow Shell:

Наконец, давайте проверим, хранятся ли приложения в SCDF, запустив команду списка приложений в оболочке:

~~ ~ В итоге мы должны увидеть оба приложения в результирующей таблице.

http --port=8181 | log

5.4.1. Специфический для предметной области язык потокового конвейера — DSL

DSL определяет конфигурацию и поток данных между приложениями. SCDF DSL прост. В первом слове мы определяем имя приложения, за которым следуют конфигурации.

Кроме того, синтаксис представляет собой синтаксис конвейера, вдохновленный Unix, который использует вертикальные полосы, также известные как «каналы», для соединения нескольких приложений:

Это создает приложение HTTP, обслуживаемое через порт 8181, которое отправляет любая полученная полезная нагрузка тела в журнал.

Теперь давайте посмотрим, как создать определение потока DSL источника JDBC.

jdbc 
    --query='SELECT id, customer_name FROM public.customer WHERE imported = false'
    --update='UPDATE public.customer SET imported = true WHERE id in (:id)'
    --max-rows-per-poll=1000
    --fixed-delay=30 --time-unit=SECONDS
    --driver-class-name=org.postgresql.Driver
    --url=jdbc:postgresql://localhost:5432/crm
    --username=postgres
    --password=postgres

5.4.2. Определение исходного потока JDBC

Ключевыми конфигурациями исходного потока JDBC являются запрос и обновление. query выберет непрочитанные записи, а update изменит флаг, чтобы предотвратить повторное чтение текущих записей.

Кроме того, мы определим источник JDBC для опроса с фиксированной задержкой в ​​30 секунд и опроса максимум 1000 строк. Наконец, мы определим конфигурации подключения, такие как драйвер, имя пользователя, пароль и URL-адрес подключения:

Дополнительные свойства конфигурации JDBC Source можно найти здесь.

customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main

5.4.3. Определение потока клиента MongoDB Sink

Поскольку мы не определяли конфигурации подключения в application.properties клиента-mongodb-sink, мы настроим их с помощью параметров DSL.

stream create --name jdbc-to-mongodb 
  --definition "jdbc 
  --query='SELECT id, customer_name FROM public.customer WHERE imported=false' 
  --fixed-delay=30 
  --max-rows-per-poll=1000 
  --update='UPDATE customer SET imported=true WHERE id in (:id)' 
  --time-unit=SECONDS 
  --password=postgres 
  --driver-class-name=org.postgresql.Driver 
  --username=postgres 
  --url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink 
  --spring.data.mongodb.uri=mongodb://localhost/main"

Наше приложение полностью основано на MongoDataAutoConfiguration. Вы можете проверить другие возможные конфигурации здесь. По сути, мы определим spring.data.mongodb.uri:

stream deploy --name jdbc-to-mongodb

5.4.4. Создание и развертывание потока

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

Во-первых, чтобы создать окончательное определение потока, вернитесь в оболочку и выполните следующую команду (без разрывов строк, они просто вставлены для удобства чтения):

Этот поток DSL определяет поток с именем jdbc-to-mongodb. Далее мы развернем поток по его имени:

Наконец, мы должны увидеть расположение всех доступных журналов в выводе журнала:

6. Заключение