«1. Введение

Spring Cloud Data Flow — это набор инструментов для построения конвейеров интеграции и обработки данных в реальном времени.

Конвейеры в данном случае — это приложения Spring Boot, созданные с использованием фреймворков Spring Cloud Stream или Spring Cloud Task.

В этом руководстве мы покажем, как использовать Spring Cloud Data Flow с Apache Spark.

2. Локальный сервер потока данных

Во-первых, нам нужно запустить сервер потока данных, чтобы иметь возможность развертывать наши задания.

Чтобы запустить Data Flow Server локально, нам нужно создать новый проект с зависимостью spring-cloud-starter-dataflow-server-local:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>

После этого нам нужно аннотировать основной класс в сервер с @EnableDataFlowServer:

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowServerApplication.class, args);
    }
}

После того, как мы запустим это приложение, у нас будет локальный сервер потока данных на порту 9393.

3. Создание проекта

Мы создадим Spark Job как автономное локальное приложение, так что нам не понадобится какой-либо кластер для его запуска.

3.1. Зависимости

Сначала мы добавим зависимость Spark:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.4.0</version>
</dependency>

3.2. Создание задания

И для нашей работы давайте аппроксимируем пи:

public class PiApproximation {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation");
        JavaSparkContext context = new JavaSparkContext(conf);
        int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2;
        int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices;

        List<Integer> xs = IntStream.rangeClosed(0, n)
          .mapToObj(element -> Integer.valueOf(element))
          .collect(Collectors.toList());

        JavaRDD<Integer> dataSet = context.parallelize(xs, slices);

        JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer -> {
           double x = Math.random() * 2 - 1;
           double y = Math.random() * 2 - 1;
           return (x * x + y * y ) < 1 ? 1: 0;
        });

        int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2);

        System.out.println("The pi was estimated as:" + count / n);

        context.stop();
    }
}

4. Data Flow Shell

Data Flow Shell — это приложение, которое позволит нам взаимодействовать с сервером. Shell использует команды DSL для описания потоков данных.

Чтобы использовать оболочку потока данных, нам нужно создать проект, который позволит нам запустить ее. Во-первых, нам нужна зависимость spring-cloud-dataflow-shell:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dataflow-shell</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>

После добавления зависимости мы можем создать класс, который будет запускать нашу оболочку потока данных:

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
     
    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

5. Развертывание проекта

Для развертывания нашего проекта мы воспользуемся так называемым средством запуска задач, которое доступно для Apache Spark в трех версиях: кластер, пряжа и клиент. Мы собираемся продолжить работу с локальной версией клиента.

Средство запуска задач — это то, что запускает наше задание Spark.

Для этого нам сначала нужно зарегистрировать нашу задачу с помощью Data Flow Shell:

app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT

Задача позволяет нам указать несколько различных параметров, некоторые из которых являются необязательными, но некоторые параметры необходимы для развертывания Правильная работа Spark:

    spark.app-class, основной класс нашего отправленного задания. spark.app-jar, путь к толстому банку, содержащему наше задание. spark.app-name, имя, которое будет использоваться для нашего задания. job spark.app-args, аргументы, которые будут переданы заданию

Мы можем использовать зарегистрированную задачу spark-client для отправки нашей задачи, не забыв указать необходимые параметры:

task create spark1 --definition "spark-client \
  --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \
  --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"

Обратите внимание, что spark .app-jar — это путь к фэт-банке с нашей работой.

После успешного создания задачи мы можем приступить к ее запуску с помощью следующей команды:

task launch spark1

Это вызовет выполнение нашей задачи.

6. Резюме

В этом руководстве мы показали, как использовать платформу Spring Cloud Data Flow для обработки данных с помощью Apache Spark. Дополнительную информацию о среде Spring Cloud Data Flow можно найти в документации.

Все примеры кода можно найти на GitHub.