«1. Введение

RxJava — это реализация Reactive Extensions Java, которая позволяет нам писать управляемые событиями и асинхронные приложения. Дополнительную информацию о том, как использовать RxJava, можно найти в нашей вводной статье здесь.

RxJava 2 был переписан с нуля, что принесло множество новых функций; некоторые из них были созданы в ответ на проблемы, существовавшие в предыдущей версии фреймворка.

Одной из таких функций является io.reactivex.Flowable.

2. Observable и Flowable

В предыдущей версии RxJava был только один базовый класс для работы с источниками, поддерживающими и не поддерживающими противодавление, — Observable.

В RxJava 2 введено четкое различие между этими двумя типами источников — источники, поддерживающие противодавление, теперь представлены с помощью специального класса — Flowable.

Наблюдаемые источники не поддерживают обратное давление. Из-за этого мы должны использовать его для источников, которые мы просто потребляем и на которые не можем повлиять.

Также, если мы имеем дело с большим количеством элементов, могут возникнуть два возможных сценария, связанных с противодавлением, в зависимости от типа Observable.

В случае использования так называемого «холодного наблюдателя» события генерируются лениво, поэтому мы защищены от переполнения наблюдателя.

Однако при использовании «горячего Observable» это будет продолжать генерировать события, даже если потребитель не может идти в ногу.

3. Создание Flowable

Существуют разные способы создания Flowable. Для нас удобно, что эти методы похожи на методы в Observable в первой версии RxJava.

3.1. Simple Flowable

Мы можем создать Flowable с помощью метода just() так же, как и с Observable:

Flowable<Integer> integerFlowable = Flowable.just(1, 2, 3, 4);

Несмотря на то, что использовать just() довольно просто, создание Flowable не очень распространено. из статических данных и используется в целях тестирования.

3.2. Flowable из Observable

Когда у нас есть Observable, мы можем легко преобразовать его в Flowable с помощью метода toFlowable():

Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable
  .toFlowable(BackpressureStrategy.BUFFER);

Обратите внимание, что для выполнения преобразования нам нужно обогатить Observable стратегией противодавления. . Мы опишем доступные стратегии в следующем разделе.

3.3. Flowable из FlowableOnSubscribe

RxJava 2 представил функциональный интерфейс FlowableOnSubscribe, который представляет Flowable, который начинает генерировать события после того, как потребитель подпишется на него.

В связи с этим все клиенты будут получать одинаковый набор событий, что делает FlowableOnSubscribe безопасным от обратного давления.

Когда у нас есть FlowableOnSubscribe, мы можем использовать его для создания Flowable:

FlowableOnSubscribe<Integer> flowableOnSubscribe
 = flowable -> flowable.onNext(1);
Flowable<Integer> integerFlowable = Flowable
  .create(flowableOnSubscribe, BackpressureStrategy.BUFFER);

В документации описано много других методов создания Flowable.

4. Flowable BackpressureStrategy

Некоторые методы, такие как toFlowable() или create(), принимают BackpressureStrategy в качестве аргумента.

BackpressureStrategy — это перечисление, определяющее поведение противодавления, которое мы применим к нашему Flowable.

Он может кэшировать или отбрасывать события или вообще не реализовывать какое-либо поведение, в последнем случае мы будем нести ответственность за его определение, используя операторы обратного давления.

BackpressureStrategy похож на BackpressureMode, присутствующий в предыдущей версии RxJava.

В RxJava 2 доступно пять различных стратегий.

4.1. Буфер

Если мы используем BackpressureStrategy.BUFFER, источник будет буферизовать все события до тех пор, пока подписчик не сможет их использовать:

public void thenAllValuesAreBufferedAndReceived() {
    List testList = IntStream.range(0, 100000)
      .boxed()
      .collect(Collectors.toList());
 
    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.BUFFER)
      .observeOn(Schedulers.computation()).test();

    testSubscriber.awaitTerminalEvent();

    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertEquals(testList, receivedInts);
}

Это похоже на вызов метода onBackpressureBuffer() в Flowable, но не позволяет явно определить размер буфера или действие onOverflow.

4.2. Drop

Мы можем использовать BackpressureStrategy.DROP, чтобы отбрасывать события, которые нельзя использовать, вместо их буферизации.

Опять же, это похоже на использование onBackpressureDrop() в Flowable:

public void whenDropStrategyUsed_thenOnBackpressureDropped() {
   
    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.DROP)
      .observeOn(Schedulers.computation())
      .test();
    testSubscriber.awaitTerminalEvent();
    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertThat(receivedInts.size() < testList.size());
    assertThat(!receivedInts.contains(100000));
 }

4.3. Latest

Использование BackpressureStrategy.LATEST заставит источник сохранять только самые последние события, тем самым перезаписывая любые предыдущие значения, если потребитель не может идти в ногу:

public void whenLatestStrategyUsed_thenTheLastElementReceived() {
  
    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.LATEST)
      .observeOn(Schedulers.computation())
      .test();

    testSubscriber.awaitTerminalEvent();
    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertThat(receivedInts.size() < testList.size());
    assertThat(receivedInts.contains(100000));
 }

BackpressureStrategy.LATEST и BackpressureStrategy.DROP выглядят очень похоже когда мы смотрим на код.

«Однако BackpressureStrategy.LATEST перезапишет элементы, которые наш подписчик не может обработать, и сохранит только самые последние, отсюда и название.

BackpressureStrategy.DROP, с другой стороны, отбрасывает элементы, которые не могут быть обработаны. Это означает, что новейшие элементы не обязательно будут испускаться.

4.4. Ошибка

Когда мы используем BackpressureStrategy.ERROR, мы просто говорим, что не ожидаем возникновения обратного давления. Следовательно, если потребитель не может идти в ногу с источником, должно быть сгенерировано исключение MissingBackpressureException:

public void whenErrorStrategyUsed_thenExceptionIsThrown() {
    Observable observable = Observable.range(1, 100000);
    TestSubscriber subscriber = observable
      .toFlowable(BackpressureStrategy.ERROR)
      .observeOn(Schedulers.computation())
      .test();

    subscriber.awaitTerminalEvent();
    subscriber.assertError(MissingBackpressureException.class);
}

4.5. Отсутствует

Если мы используем BackpressureStrategy.MISSING, источник будет помещать элементы без отбрасывания или буферизации.

В этом случае нисходящему потоку придется иметь дело с переполнением:

public void whenMissingStrategyUsed_thenException() {
    Observable observable = Observable.range(1, 100000);
    TestSubscriber subscriber = observable
      .toFlowable(BackpressureStrategy.MISSING)
      .observeOn(Schedulers.computation())
      .test();
    subscriber.awaitTerminalEvent();
    subscriber.assertError(MissingBackpressureException.class);
}

В наших тестах мы исключаем MissingbackpressureException как для стратегий ERROR, так и для стратегий MISSING. Поскольку оба они будут генерировать такое исключение, когда внутренний буфер источника будет переполнен.

Однако стоит отметить, что оба они имеют разное назначение.

Мы должны использовать первый, когда мы вообще не ожидаем обратного давления, и мы хотим, чтобы источник выдавал исключение в случае его возникновения.

Последний можно использовать, если мы не хотим указывать поведение по умолчанию при создании Flowable. И мы собираемся использовать операторы обратного давления, чтобы определить это позже.

5. Резюме

В этом уроке мы представили новый класс, представленный в RxJava 2, под названием Flowable.

Чтобы найти больше информации о самом Flowable и его API, мы можем обратиться к документации.

Как всегда, все образцы кода можно найти на GitHub.