«1. Обзор

В этой статье мы рассмотрим библиотеку akka-streams, созданную на основе среды актеров Akka, которая соответствует манифесту реактивных потоков. Akka Streams API позволяет нам легко составлять потоки преобразования данных из независимых шагов.

Более того, вся обработка выполняется реактивным, неблокирующим и асинхронным способом.

2. Зависимости Maven

Для начала нам нужно добавить библиотеки akka-stream и akka-stream-testkit в наш pom.xml:

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.11</artifactId>
    <version>2.5.2</version>
</dependency>
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream-testkit_2.11</artifactId>
    <version>2.5.2</version>
</dependency>

3. API Akka Streams

Чтобы работая с Akka Streams, нам необходимо знать основные концепции API:

    Источник — точка входа для обработки в библиотеке akka-stream — мы можем создать экземпляр этого класса из нескольких источников; например, мы можем использовать метод single(), если мы хотим создать источник из одной строки, или мы можем создать источник из итерируемого элемента потока — основного строительного блока обработки — каждый экземпляр потока имеет одно входное и одно выходное значение Materializer — мы можем использовать один, если хотим, чтобы наш поток имел некоторые побочные эффекты, такие как ведение журнала или сохранение результатов; чаще всего мы будем передавать псевдоним NotUsed в качестве материализатора, чтобы обозначить, что наш поток не должен иметь никаких побочных эффектов. Операция приемника — когда мы создаем поток, он не выполняется до тех пор, пока мы не зарегистрируем для него операцию приемника — – это терминальная операция, запускающая все вычисления во всем потоке

4. Создание потоков в потоках Akka

Давайте начнем с создания простого примера, где мы покажем, как создавать и комбинировать несколько потоков – “ для обработки потока целых чисел и вычисления среднего скользящего окна пар целых чисел из потока.

Мы проанализируем строку целых чисел, разделенных точкой с запятой, в качестве входных данных, чтобы создать наш источник akka-stream для примера.

4.1. Использование потока для синтаксического анализа ввода

Во-первых, давайте создадим класс DataImporter, который будет принимать экземпляр ActorSystem, который мы будем использовать позже для создания нашего потока:

public class DataImporter {
    private ActorSystem actorSystem;

    // standard constructors, getters...
}

Затем давайте создадим метод parseLine, который будет генерировать список целых чисел из нашей входной строки с разделителями. Имейте в виду, что здесь мы используем Java Stream API только для синтаксического анализа:

private List<Integer> parseLine(String line) {
    String[] fields = line.split(";");
    return Arrays.stream(fields)
      .map(Integer::parseInt)
      .collect(Collectors.toList());
}

Наш исходный поток применит parseLine к нашему вводу, чтобы создать поток с типом ввода String и типом вывода Integer:

private Flow<String, Integer, NotUsed> parseContent() {
    return Flow.of(String.class)
      .mapConcat(this::parseLine);
}

Когда мы вызываем метод parseLine(), компилятор знает, что аргументом этой лямбда-функции будет String — такой же, как тип ввода для нашего Flow.

Обратите внимание, что мы используем метод mapConcat(), эквивалентный методу flatMap() в Java 8, потому что мы хотим сгладить список целых чисел, возвращаемый функцией parseLine(), в поток целых чисел, чтобы последующие шаги в нашей обработке не должны иметь дело со списком.

4.2. Использование потока для выполнения вычислений

На данный момент у нас есть поток проанализированных целых чисел. Теперь нам нужно реализовать логику, которая будет группировать все входные элементы в пары и вычислять среднее значение этих пар.

Теперь мы создадим поток целых чисел и сгруппируем их с помощью метода grouped().

Далее мы хотим рассчитать среднее значение.

Поскольку нас не интересует порядок, в котором будут обрабатываться эти средние значения, мы можем вычислять средние значения параллельно с использованием нескольких потоков с помощью метода mapAsyncUnordered(), передавая количество потоков в качестве аргумента этому методу.

Действие, которое будет передано потоку как лямбда, должно возвращать CompletableFuture, поскольку это действие будет вычисляться асинхронно в отдельном потоке:

private Flow<Integer, Double, NotUsed> computeAverage() {
    return Flow.of(Integer.class)
      .grouped(2)
      .mapAsyncUnordered(8, integers ->
        CompletableFuture.supplyAsync(() -> integers.stream()
          .mapToDouble(v -> v)
          .average()
          .orElse(-1.0)));
}

Мы вычисляем средние значения в восьми параллельных потоках. Обратите внимание, что мы используем Java 8 Stream API для вычисления среднего значения.

4.3. Объединение нескольких потоков в один

Flow API — это плавная абстракция, которая позволяет нам объединять несколько экземпляров потока для достижения нашей конечной цели обработки. У нас могут быть гранулярные потоки, где один, например, анализирует JSON, другой выполняет некоторые преобразования, а третий собирает некоторую статистику.

«Такая степень детализации поможет нам создать более тестируемый код, потому что мы можем тестировать каждый шаг обработки независимо.

Выше мы создали два потока, которые могут работать независимо друг от друга. Теперь мы хотим составить их вместе.

Во-первых, мы хотим проанализировать нашу входную строку, а затем мы хотим вычислить среднее значение для потока элементов.

Мы можем составить наши потоки, используя метод via():

Flow<String, Double, NotUsed> calculateAverage() {
    return Flow.of(String.class)
      .via(parseContent())
      .via(computeAverage());
}

Мы создали поток с типом ввода String и два других потока после него. Поток parseContent() принимает на вход строку и возвращает целое число в качестве вывода. Поток calculateAverage() берет это целое число и вычисляет среднее значение, возвращающее значение Double в качестве типа вывода.

5. Добавление стока в поток

Как мы уже упоминали, к этому моменту весь поток еще не выполняется, потому что он ленив. Чтобы начать выполнение потока, нам нужно определить приемник. Операция Sink может, например, сохранять данные в базу данных или отправлять результаты на какой-либо внешний веб-сервис.

Предположим, у нас есть класс AverageRepository со следующим методом save(), который записывает результаты в нашу базу данных:

CompletionStage<Double> save(Double average) {
    return CompletableFuture.supplyAsync(() -> {
        // write to database
        return average;
    });
}

Теперь мы хотим создать операцию Sink, которая использует этот метод для сохранения результатов нашей обработки потока. . Чтобы создать наш Sink, нам сначала нужно создать Flow, который принимает результат нашей обработки в качестве типа ввода. Далее мы хотим сохранить все наши результаты в базу данных.

Опять же, нас не волнует порядок элементов, поэтому мы можем выполнять операции save() параллельно, используя метод mapAsyncUnordered().

Чтобы создать Sink из потока, нам нужно вызвать toMat() с Sink.ignore() в качестве первого аргумента и Keep.right() в качестве второго, потому что мы хотим вернуть статус обработки: ~~ ~

private Sink<Double, CompletionStage<Done>> storeAverages() {
    return Flow.of(Double.class)
      .mapAsyncUnordered(4, averageRepository::save)
      .toMat(Sink.ignore(), Keep.right());
}

6. Определение источника для потока

Последнее, что нам нужно сделать, это создать источник из входной строки. Мы можем применить к этому источнику поток calculateAverage(), используя метод via().

Затем, чтобы добавить приемник к обработке, нам нужно вызвать метод runWith() и передать только что созданный приемник storeAverages():

CompletionStage<Done> calculateAverageForContent(String content) {
    return Source.single(content)
      .via(calculateAverage())
      .runWith(storeAverages(), ActorMaterializer.create(actorSystem))
      .whenComplete((d, e) -> {
          if (d != null) {
              System.out.println("Import finished ");
          } else {
              e.printStackTrace();
          }
      });
}

Обратите внимание, что когда обработка завершена, мы добавляем обратный вызов whenComplete(), в котором мы можем выполнить какое-либо действие в зависимости от результата обработки.

7. Тестирование Akka Streams

Мы можем протестировать нашу обработку, используя akka-stream-testkit.

Лучший способ проверить фактическую логику обработки — проверить всю логику потока и использовать TestSink для запуска вычислений и подтверждения результатов.

В нашем тесте мы создаем поток, который мы хотим протестировать, а затем мы создаем источник из тестового входного содержимого:

@Test
public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() {
    // given
    Flow<String, Double, NotUsed> tested = new DataImporter(actorSystem).calculateAverage();
    String input = "1;9;11;0";

    // when
    Source<Double, NotUsed> flow = Source.single(input).via(tested);

    // then
    flow
      .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
      .request(4)
      .expectNextUnordered(5d, 5.5);
}

Мы проверяем, ожидаем ли мы четыре входных аргумента, и два результата, которые являются средними, могут быть получены в любом порядке, потому что наша обработка выполняется асинхронно и параллельно.

8. Заключение

В этой статье мы рассмотрели библиотеку akka-stream.

Мы определили процесс, который объединяет несколько потоков для вычисления скользящего среднего значений элементов. Затем мы определили источник, который является точкой входа обработки потока, и приемник, запускающий фактическую обработку.

Наконец, мы написали тест для нашей обработки, используя akka-stream-testkit.

Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub — это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.