«1. Введение
В этом кратком руководстве мы обсудим различные способы объединения Observables в RxJava.
Если вы новичок в RxJava, обязательно сначала ознакомьтесь с этим вводным руководством.
Теперь давайте сразу приступим.
2. Наблюдаемые объекты
Наблюдаемые последовательности, или просто наблюдаемые, представляют собой представления асинхронных потоков данных.
Они основаны на шаблоне Observer, в котором объект, называемый Observer, подписывается на элементы, испускаемые Observable.
Подписка не блокируется, поскольку Observer может реагировать на все, что Observable будет испускать в будущем. Это, в свою очередь, облегчает параллелизм.
Вот простая демонстрация в RxJava:
Observable
.from(new String[] { "John", "Doe" })
.subscribe(name -> System.out.println("Hello " + name))
3. Объединение Observables
При программировании с использованием реактивного фреймворка обычным вариантом использования является объединение различных Observables.
В веб-приложении, например, нам может понадобиться получить два набора асинхронных потоков данных, которые не зависят друг от друга.
Вместо ожидания завершения предыдущего потока перед запросом следующего потока мы можем вызвать оба одновременно и подписаться на объединенные потоки.
В этом разделе мы обсудим некоторые из различных способов, которыми мы можем комбинировать несколько Observables в RxJava, и различные варианты использования, к которым применяется каждый метод.
3.1. Слияние
Мы можем использовать оператор слияния для объединения вывода нескольких Observable, чтобы они действовали как одно целое:
@Test
public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable.merge(
Observable.from(new String[] {"Hello", "World"}),
Observable.from(new String[] {"I love", "RxJava"})
).subscribe(testSubscriber);
testSubscriber.assertValues("Hello", "World", "I love", "RxJava");
}
3.2. MergeDelayError
Метод mergeDelayError аналогичен слиянию в том, что он объединяет несколько Observables в один, но если во время слияния возникают ошибки, он позволяет безошибочным элементам продолжаться до распространения ошибок:
@Test
public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable.mergeDelayError(
Observable.from(new String[] { "hello", "world" }),
Observable.error(new RuntimeException("Some exception")),
Observable.from(new String[] { "rxjava" })
).subscribe(testSubscriber);
testSubscriber.assertValues("hello", "world", "rxjava");
testSubscriber.assertError(RuntimeException.class);
}
Вышеупомянутое пример выдает все безошибочные значения:
hello
world
rxjava
Обратите внимание, что если мы используем слияние вместо mergeDelayError, строка «rxjava» не будет сгенерирована, потому что слияние немедленно останавливает поток данных из Observables при возникновении ошибки .
3.3. Zip
Метод расширения zip объединяет две последовательности значений в виде пар:
@Test
public void givenTwoObservables_whenZipped_thenReturnCombinedResults() {
List<String> zippedStrings = new ArrayList<>();
Observable.zip(
Observable.from(new String[] { "Simple", "Moderate", "Complex" }),
Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}),
(str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);
assertThat(zippedStrings).isNotEmpty();
assertThat(zippedStrings.size()).isEqualTo(3);
assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy");
}
3.4. Заархивировать с интервалом
В этом примере мы заархивируем поток с интервалом, который фактически задержит передачу элементов первого потока:
@Test
public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS);
Observable
.zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData))
.toBlocking().subscribe(testSubscriber);
testSubscriber.assertCompleted();
testSubscriber.assertValueCount(5);
testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");
}
4. Резюме
В этой статье мы видел несколько методов объединения Observables с RxJava. Вы можете узнать о других методах, таких как combLatest, join, groupJoin, switchOnNext, в официальной документации RxJava.
Как всегда, исходный код этой статьи доступен в нашем репозитории GitHub.