«1. Обзор

В этой статье мы рассмотрим различные способы объединения издателей в Project Reactor.

2. Зависимости Maven

Давайте настроим наш пример с зависимостями Project Reactor:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.1.4.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.1.4.RELEASE</version>
    <scope>test</scope>
</dependency>

3. Объединение издателей

Дан сценарий, когда приходится работать с Flux\u003cT\u003e или Mono\u003c T\u003e, есть разные способы объединения потоков.

Давайте создадим несколько примеров, иллюстрирующих использование статических методов в классе Flux\u003cT\u003e, таких как concat, concatWith, merge, zip и combLatest.

В наших примерах будут использоваться два публикатора типа Flux\u003cInteger\u003e, а именно, evenNumbers, который представляет собой Flux of Integer и содержит последовательность четных чисел, начинающуюся с 1 (минимальная переменная) и ограниченную 5 (максимальная переменная).

Мы создадим oddNumbers, а также Flux типа Integer нечетных чисел:

Flux<Integer> evenNumbers = Flux
  .range(min, max)
  .filter(x -> x % 2 == 0); // i.e. 2, 4

Flux<Integer> oddNumbers = Flux
  .range(min, max)
  .filter(x -> x % 2 > 0);  // ie. 1, 3, 5

3.1. concat()

Метод concat выполняет конкатенацию входных данных, пересылая элементы, испускаемые источниками ниже по течению.

Конкатенация достигается путем последовательной подписки на первый источник, затем ожидания ее завершения перед подпиской на следующий, и так далее, пока не завершится последний источник. Любая ошибка немедленно прерывает последовательность и передается вниз по течению.

Вот краткий пример:

@Test
public void givenFluxes_whenConcatIsInvoked_thenConcat() {
    Flux<Integer> fluxOfIntegers = Flux.concat(
      evenNumbers, 
      oddNumbers);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

3.2. concatWith()

Используя статический метод concatWith, мы создадим в результате конкатенацию двух источников типа Flux\u003cT\u003e:

@Test
public void givenFluxes_whenConcatWithIsInvoked_thenConcatWith() {
    Flux<Integer> fluxOfIntegers = evenNumbers.concatWith(oddNumbers);
        
    // same stepVerifier as in the concat example above
}

3.3. combLatest()

Статический метод Flux combLatest будет генерировать данные, предоставленные комбинацией последнего опубликованного значения из каждого из источников Publisher.

Вот пример использования этого метода с двумя источниками Publisher и BiFunction в качестве параметров:

@Test
public void givenFluxes_whenCombineLatestIsInvoked_thenCombineLatest() {
    Flux<Integer> fluxOfIntegers = Flux.combineLatest(
      evenNumbers, 
      oddNumbers, 
      (a, b) -> a + b);

    StepVerifier.create(fluxOfIntegers)
      .expectNext(5) // 4 + 1
      .expectNext(7) // 4 + 3
      .expectNext(9) // 4 + 5
      .expectComplete()
      .verify();
}

Здесь мы видим, что функция combLatest применила функцию «a + b», используя последний элемент четныеЧисла (4) и элементы нечетныхЧисел (1,3,5), таким образом генерируя последовательность 5,7,9.

3.4. merge()

Функция слияния выполняет слияние данных из последовательностей Publisher, содержащихся в массиве, в объединенную последовательность с чередованием:

@Test
public void givenFluxes_whenMergeIsInvoked_thenMerge() {
    Flux<Integer> fluxOfIntegers = Flux.merge(
      evenNumbers, 
      oddNumbers);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

Интересно отметить, что, в отличие от concat (ленивой подписки), Источники охотно подписаны.

Здесь мы можем увидеть другой результат функции слияния, если мы вставим задержку между элементами Publishers:

@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
    Flux<Integer> fluxOfIntegers = Flux.merge(
      evenNumbers.delayElements(Duration.ofMillis(500L)), 
      oddNumbers.delayElements(Duration.ofMillis(300L)));
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectNext(5)
      .expectNext(4)
      .expectComplete()
      .verify();
}

3.5. mergeSequential()

Метод mergeSequential объединяет данные из последовательностей Publisher, представленных в массиве, в упорядоченную объединенную последовательность.

В отличие от concat, на источники охотно подписываются.

Кроме того, в отличие от слияния, их выдаваемые значения объединяются в конечную последовательность в порядке подписки:

@Test
public void testMergeSequential() {
    Flux<Integer> fluxOfIntegers = Flux.mergeSequential(
      evenNumbers, 
      oddNumbers);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

3.6. mergeDelayError()

Функция mergeDelayError объединяет данные из последовательностей Publisher, содержащихся в массиве, в чередующуюся объединенную последовательность.

В отличие от concat, на источники охотно подписываются.

Этот вариант метода статического слияния задерживает любую ошибку до тех пор, пока остальная часть невыполненной работы слияния не будет обработана.

Вот пример ошибки mergeDelayError:

@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
    Flux<Integer> fluxOfIntegers = Flux.mergeDelayError(1, 
      evenNumbers.delayElements(Duration.ofMillis(500L)), 
      oddNumbers.delayElements(Duration.ofMillis(300L)));
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectNext(5)
      .expectNext(4)
      .expectComplete()
      .verify();
}

3.7. mergeWith()

Статический метод mergeWith объединяет данные из этого Flux и Publisher в чередующуюся объединенную последовательность.

Опять же, в отличие от concat, внутренние источники подписываются охотно:

@Test
public void givenFluxes_whenMergeWithIsInvoked_thenMergeWith() {
    Flux<Integer> fluxOfIntegers = evenNumbers.mergeWith(oddNumbers);
        
    // same StepVerifier as in "3.4. Merge"
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

3.8. zip()

Статический метод zip объединяет несколько источников вместе, т. е. ожидает, пока все источники выдадут один элемент, и объединяет эти элементы в выходное значение (созданное с помощью предоставленной функции-комбинатора).

Оператор будет продолжать делать это до тех пор, пока не завершится любой из источников:

@Test
public void givenFluxes_whenZipIsInvoked_thenZip() {
    Flux<Integer> fluxOfIntegers = Flux.zip(
      evenNumbers, 
      oddNumbers, 
      (a, b) -> a + b);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(3) // 2 + 1
      .expectNext(7) // 4 + 3
      .expectComplete()
      .verify();
}

Так как в четных числах не осталось элементов для сопряжения, элемент 5 из издателя нечетных чисел игнорируется.

3.9. zipWith()

zipWith выполняет тот же метод, что и zip, но только с двумя издателями:

@Test
public void givenFluxes_whenZipWithIsInvoked_thenZipWith() {
    Flux<Integer> fluxOfIntegers = evenNumbers
     .zipWith(oddNumbers, (a, b) -> a * b);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)  // 2 * 1
      .expectNext(12) // 4 * 3
      .expectComplete()
      .verify();
}

4. Заключение

В этом кратком руководстве мы обнаружили несколько способов объединения издателей.

Как всегда, примеры доступны на GitHub.