«1. Введение

В этом кратком руководстве мы обсудим различные способы объединения Observables в RxJava.

Если вы новичок в RxJava, обязательно сначала ознакомьтесь с этим вводным руководством.

Теперь давайте сразу приступим.

2. Наблюдаемые объекты

Наблюдаемые последовательности, или просто наблюдаемые, представляют собой представления асинхронных потоков данных.

Они основаны на шаблоне Observer, в котором объект, называемый Observer, подписывается на элементы, испускаемые Observable.

Подписка не блокируется, поскольку Observer может реагировать на все, что Observable будет испускать в будущем. Это, в свою очередь, облегчает параллелизм.

Вот простая демонстрация в RxJava:

Observable
  .from(new String[] { "John", "Doe" })
  .subscribe(name -> System.out.println("Hello " + name))

3. Объединение Observables

При программировании с использованием реактивного фреймворка обычным вариантом использования является объединение различных Observables.

В веб-приложении, например, нам может понадобиться получить два набора асинхронных потоков данных, которые не зависят друг от друга.

Вместо ожидания завершения предыдущего потока перед запросом следующего потока мы можем вызвать оба одновременно и подписаться на объединенные потоки.

В этом разделе мы обсудим некоторые из различных способов, которыми мы можем комбинировать несколько Observables в RxJava, и различные варианты использования, к которым применяется каждый метод.

3.1. Слияние

Мы можем использовать оператор слияния для объединения вывода нескольких Observable, чтобы они действовали как одно целое:

@Test
public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();

    Observable.merge(
      Observable.from(new String[] {"Hello", "World"}),
      Observable.from(new String[] {"I love", "RxJava"})
    ).subscribe(testSubscriber);

    testSubscriber.assertValues("Hello", "World", "I love", "RxJava");
}

3.2. MergeDelayError

Метод mergeDelayError аналогичен слиянию в том, что он объединяет несколько Observables в один, но если во время слияния возникают ошибки, он позволяет безошибочным элементам продолжаться до распространения ошибок:

@Test
public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
        
    Observable.mergeDelayError(
      Observable.from(new String[] { "hello", "world" }),
      Observable.error(new RuntimeException("Some exception")),
      Observable.from(new String[] { "rxjava" })
    ).subscribe(testSubscriber);

    testSubscriber.assertValues("hello", "world", "rxjava");
    testSubscriber.assertError(RuntimeException.class);
}

Вышеупомянутое пример выдает все безошибочные значения:

hello
world
rxjava

Обратите внимание, что если мы используем слияние вместо mergeDelayError, строка «rxjava» не будет сгенерирована, потому что слияние немедленно останавливает поток данных из Observables при возникновении ошибки .

3.3. Zip

Метод расширения zip объединяет две последовательности значений в виде пар:

@Test
public void givenTwoObservables_whenZipped_thenReturnCombinedResults() {
    List<String> zippedStrings = new ArrayList<>();

    Observable.zip(
      Observable.from(new String[] { "Simple", "Moderate", "Complex" }), 
      Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}),
      (str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);
        
    assertThat(zippedStrings).isNotEmpty();
    assertThat(zippedStrings.size()).isEqualTo(3);
    assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy");
}

3.4. Заархивировать с интервалом

В этом примере мы заархивируем поток с интервалом, который фактически задержит передачу элементов первого потока:

@Test
public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
        
    Observable<String> data = Observable.just("one", "two", "three", "four", "five");
    Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS);
        
    Observable
      .zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData))
      .toBlocking().subscribe(testSubscriber);
        
    testSubscriber.assertCompleted();
    testSubscriber.assertValueCount(5);
    testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");
}

4. Резюме

В этой статье мы видел несколько методов объединения Observables с RxJava. Вы можете узнать о других методах, таких как combLatest, join, groupJoin, switchOnNext, в официальной документации RxJava.

Как всегда, исходный код этой статьи доступен в нашем репозитории GitHub.