«1. Обзор

В этом руководстве мы будем использовать основы Project Reactor, чтобы изучить несколько методов создания потоков.

2. Зависимости Maven

Давайте начнем с пары зависимостей. Нам понадобится реактор-ядро и реактор-тест:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.2.6.RELEASE</version>
    <scope>test</scope>
</dependency>

3. Синхронная эмиссия

Самый простой способ создать поток — Flux#generate. Этот метод использует функцию генератора для создания последовательности элементов.

Но сначала давайте определим класс для хранения наших методов, иллюстрирующих метод generate:

public class SequenceGenerator {
    // methods that will follow
}

3.1. Генератор с новыми состояниями

Давайте посмотрим, как мы можем сгенерировать последовательность Фибоначчи с помощью Reactor:

public Flux<Integer> generateFibonacciWithTuples() {
    return Flux.generate(
            () -> Tuples.of(0, 1),
            (state, sink) -> {
                sink.next(state.getT1());
                return Tuples.of(state.getT2(), state.getT1() + state.getT2());
            }
    );
}

Нетрудно заметить, что этот метод generate принимает в качестве аргументов две функции — Callable и BiFunction: ~~ ~ Функция Callable устанавливает начальное состояние для генератора — в данном случае это кортежи с элементами 0 и 1. Функция BiFuntion — это генератор, потребляющий SynchronousSink, а затем испускающий элемент в каждом раунде с помощью следующего метода приемника. и текущее состояние

    Как следует из названия, объект SynchronousSink работает синхронно. Однако обратите внимание, что мы не можем вызывать метод next этого объекта более одного раза для каждого вызова генератора.

Давайте проверим сгенерированную последовательность с помощью StepVerifier:

В этом примере подписчик запрашивает всего пять элементов, поэтому сгенерированная последовательность заканчивается номером 3.

@Test
public void whenGeneratingNumbersWithTuplesState_thenFibonacciSequenceIsProduced() {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    Flux<Integer> fibonacciFlux = sequenceGenerator.generateFibonacciWithTuples().take(5);

    StepVerifier.create(fibonacciFlux)
      .expectNext(0, 1, 1, 2, 3)
      .expectComplete()
      .verify();
}

Как мы видим, генератор возвращает новый объект состояния, который будет использоваться в следующем проходе. Впрочем, этого делать не обязательно. Вместо этого мы можем повторно использовать экземпляр состояния для всех вызовов генератора.

3.2. Генератор с изменяемым состоянием

Предположим, мы хотим сгенерировать последовательность Фибоначчи с измененным состоянием. Чтобы продемонстрировать этот вариант использования, давайте сначала определим класс:

Мы будем использовать экземпляр этого класса для хранения состояния генератора. Два свойства этого экземпляра, первое и второе, хранят два последовательных числа в последовательности.

public class FibonacciState {
    private int former;
    private int latter;

    // constructor, getters and setters
}

Если мы изменим наш первоначальный пример, мы теперь будем использовать изменяемое состояние с генерацией:

Как и в предыдущем примере, этот вариант генерации имеет параметры поставщика состояния и генератора.

public Flux<Integer> generateFibonacciWithCustomClass(int limit) {
    return Flux.generate(
      () -> new FibonacciState(0, 1),
      (state, sink) -> {
        sink.next(state.getFormer());
        if (state.getLatter() > limit) {
            sink.complete();
        }
        int temp = state.getFormer();
        state.setFormer(state.getLatter());
        state.setLatter(temp + state.getLatter());
        return state;
    });
}

Поставщик состояния типа Callable просто создает объект FibonacciState с начальными свойствами 0 и 1. Этот объект состояния будет повторно использоваться на протяжении всего жизненного цикла генератора.

Точно так же, как SynchronousSink в примере Фибоначчи с кортежами, сток здесь производит элементы один за другим. Однако, в отличие от этого примера, генератор при каждом вызове возвращает один и тот же объект состояния.

Также обратите внимание на этот раз, чтобы избежать бесконечной последовательности, мы указываем приемнику завершиться, когда произведенное значение достигнет предела.

И давайте еще раз проведем быстрый тест, чтобы убедиться, что он работает:

3.3. Вариант без сохранения состояния

@Test
public void whenGeneratingNumbersWithCustomClass_thenFibonacciSequenceIsProduced() {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();

    StepVerifier.create(sequenceGenerator.generateFibonacciWithCustomClass(10))
      .expectNext(0, 1, 1, 2, 3, 5, 8)
      .expectComplete()
      .verify();
}

У метода generate есть еще один вариант только с одним параметром типа Consumer\u003cSynchronousSink\u003e. Этот вариант подходит только для создания заранее определенной последовательности, поэтому он не такой мощный. Тогда мы не будем подробно рассказывать об этом.

4. Асинхронная эмиссия

Синхронная эмиссия — не единственное решение для программного создания Flux.

Вместо этого мы можем использовать операторы create и push для создания нескольких элементов в раунде эмиссии асинхронным образом.

4.1. Метод create

Используя метод create, мы можем создавать элементы из нескольких потоков. В этом примере мы соберем элементы из двух разных источников в последовательность.

Во-первых, давайте посмотрим, чем create немного отличается от generate:

В отличие от оператора generate, метод create не поддерживает состояние. И вместо того, чтобы генерировать элементы самостоятельно, эмиттер, переданный этому методу, получает элементы из внешнего источника.

public class SequenceCreator {
    public Consumer<List<Integer>> consumer;

    public Flux<Integer> createNumberSequence() {
        return Flux.create(sink -> SequenceCreator.this.consumer = items -> items.forEach(sink::next));
    }
}

Кроме того, мы видим, что оператор create запрашивает у нас FluxSink вместо SynchronousSink. С FluxSink мы можем вызывать next() столько раз, сколько нам нужно.

В нашем случае мы будем вызывать next() для каждого элемента, который у нас есть в списке элементов, выдавая их один за другим. Вскоре мы увидим, как заполнять элементы.

«Наш внешний источник в данном случае — это воображаемое потребительское поле, хотя это может быть некий наблюдаемый API.

Давайте запустим оператор создания, начав с двух последовательностей чисел:

Эти последовательности, последовательность1 и последовательность2, будут служить источниками элементов для сгенерированной последовательности.

@Test
public void whenCreatingNumbers_thenSequenceIsProducedAsynchronously() throws InterruptedException {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    List<Integer> sequence1 = sequenceGenerator.generateFibonacciWithTuples().take(3).collectList().block();
    List<Integer> sequence2 = sequenceGenerator.generateFibonacciWithTuples().take(4).collectList().block();

    // other statements described below
}

Далее следуют два объекта Thread, которые будут передавать элементы в издатель:

Когда вызывается оператор accept, элементы начинают поступать в источник последовательности.

SequenceCreator sequenceCreator = new SequenceCreator();
Thread producingThread1 = new Thread(
  () -> sequenceCreator.consumer.accept(sequence1)
);
Thread producingThread2 = new Thread(
  () -> sequenceCreator.consumer.accept(sequence2)
);

Затем мы можем прослушать или подписаться на нашу новую консолидированную последовательность:

Подписываясь на нашу последовательность, мы указываем, что должно произойти с каждым элементом, испускаемым последовательностью. Здесь нужно добавить каждый элемент из разрозненных источников в сводный список.

List<Integer> consolidated = new ArrayList<>();
sequenceCreator.createNumberSequence().subscribe(consolidated::add);

Теперь мы запускаем весь процесс, в котором элементы перемещаются по двум разным потокам:

Как обычно, последним шагом является проверка результата операции:

producingThread1.start();
producingThread2.start();
producingThread1.join();
producingThread2.join();

Первые три числа в полученная последовательность исходит из последовательности1, а последние четыре из последовательности2. Из-за характера асинхронных операций порядок элементов в этих последовательностях не гарантируется.

assertThat(consolidated).containsExactlyInAnyOrder(0, 1, 1, 0, 1, 1, 2);

У метода create есть еще один вариант, принимающий аргумент типа OverflowStrategy. Как следует из названия, этот аргумент управляет обратным давлением, когда нисходящий поток не может идти в ногу с издателем. По умолчанию в таком случае издатель буферизует все элементы.

4.2. Метод push

В дополнение к оператору create класс Flux имеет еще один статический метод для асинхронной генерации последовательности, а именно push. Этот метод работает так же, как create, за исключением того, что он позволяет только одному производящему потоку одновременно излучать сигналы.

Мы могли бы заменить метод create в примере, который мы только что рассмотрели, методом push, и код все равно скомпилируется

Однако иногда мы могли бы видеть ошибку утверждения, так как оператор push не позволяет FluxSink#next быть вызываются одновременно в разных потоках. В результате мы должны использовать push, только если мы не собираемся использовать несколько потоков.

5. Обработка последовательностей

Все рассмотренные нами до сих пор методы являются статическими и позволяют создавать последовательности из заданного источника. API Flux также предоставляет метод экземпляра с именем handle для обработки последовательности, созданной издателем.

Этот оператор дескриптора принимает последовательность, выполняет некоторую обработку и, возможно, удаляет некоторые элементы. В связи с этим можно сказать, что оператор дескриптора работает так же, как карта и фильтр.

Давайте взглянем на простую иллюстрацию метода handle:

В этом примере оператор handle принимает последовательность чисел, делит значение на 2, если оно четное. В случае, если значение является нечетным числом, оператор ничего не делает, то есть такое число игнорируется.

public class SequenceHandler {
    public Flux<Integer> handleIntegerSequence(Flux<Integer> sequence) {
        return sequence.handle((number, sink) -> {
            if (number % 2 == 0) {
                sink.next(number / 2);
            }
        });
    }
}

Еще одна вещь, на которую следует обратить внимание, это то, что, как и в случае с методом generate, handle использует SynchronousSink и разрешает только однократные выбросы.

И, наконец, нам нужно что-то проверить. Давайте в последний раз воспользуемся StepVerifier, чтобы убедиться, что наш обработчик работает:

Среди первых 10 элементов последовательности Фибоначчи есть четыре четных числа: 0, 2, 8 и 34, следовательно, аргументы, которые мы передаем в ОжидайтеСледующий метод.

@Test
public void whenHandlingNumbers_thenSequenceIsMappedAndFiltered() {
    SequenceHandler sequenceHandler = new SequenceHandler();
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    Flux<Integer> sequence = sequenceGenerator.generateFibonacciWithTuples().take(10);

    StepVerifier.create(sequenceHandler.handleIntegerSequence(sequence))
      .expectNext(0, 1, 4, 17)
      .expectComplete()
      .verify();
}

6. Заключение

В этой статье мы рассмотрели различные методы Flux API, которые можно использовать для создания последовательности программным способом, в частности операторы generate и create.

Исходный код этого руководства доступен на GitHub. Это проект Maven, и он должен работать как есть.

«