«1. Обзор

В этой статье мы узнаем о некоторых служебных операторах для работы с Observables в RxJava и о том, как реализовать собственные.

Оператор — это функция, которая принимает и изменяет поведение вышестоящего Observable\u003cT\u003e и возвращает нижестоящий Observable\u003cR\u003e или Subscriber, где типы T и R могут совпадать, а могут и не совпадать.

Операторы оборачивают существующие Observable и улучшают их, как правило, путем перехвата подписки. Это может показаться сложным, но на самом деле это довольно гибко и не так сложно понять.

2. Сделайте

Есть несколько действий, которые могут изменить события жизненного цикла Observable.

Оператор doOnNext модифицирует исходный код Observable, чтобы он вызывал действие при вызове onNext.

Оператор doOnCompleted регистрирует действие, которое вызывается, если результирующий Observable завершается нормально, вызывая метод Observer onCompleted:

Observable.range(1, 10)
  .doOnNext(r -> receivedTotal += r)
  .doOnCompleted(() -> result = "Completed")
  .subscribe();
 
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Completed"));

Оператор doOnEach модифицирует источник Observable, чтобы он уведомлял Observer о каждом элементе и устанавливает обратный вызов, который будет вызываться каждый раз при создании элемента.

Оператор doOnSubscribe регистрирует действие, которое вызывается всякий раз, когда Observer подписывается на полученный Observable.

Есть также оператор doOnUnsubscribe, который делает противоположное doOnSubscribe:

Observable.range(1, 10)
  .doOnEach(new Observer<Integer>() {
      @Override
      public void onCompleted() {
          System.out.println("Complete");
      }
      @Override
      public void onError(Throwable e) {
          e.printStackTrace();
      }
      @Override
      public void onNext(Integer value) {
          receivedTotal += value;
      }
  })
  .doOnSubscribe(() -> result = "Subscribed")
  .subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Subscribed"));

Когда Observable завершается с ошибкой, мы можем использовать оператор doOnError для выполнения действия.

Оператор DoOnTerminate регистрирует действие, которое будет вызвано, когда Observable завершится успешно или с ошибкой:

thrown.expect(OnErrorNotImplementedException.class);
Observable.empty()
  .single()
  .doOnError(throwable -> { throw new RuntimeException("error");})
  .doOnTerminate(() -> result += "doOnTerminate")
  .doAfterTerminate(() -> result += "_doAfterTerminate")
  .subscribe();
assertTrue(result.equals("doOnTerminate_doAfterTerminate"));

Существует также оператор finallyDo, который был объявлен устаревшим в пользу doAfterTerminate. Он регистрирует действие, когда Observable завершается.

3. ObserveOn vs SubscribeOn

По умолчанию Observable вместе с цепочкой операторов будет работать в том же потоке, в котором вызывается его метод Subscribe.

Оператор ObserveOn указывает другой планировщик, который Observable будет использовать для отправки уведомлений наблюдателям:

Observable.range(1, 5)
  .map(i -> i * 100)
  .doOnNext(i -> {
      emittedTotal += i;
      System.out.println("Emitting " + i
        + " on thread " + Thread.currentThread().getName());
  })
  .observeOn(Schedulers.computation())
  .map(i -> i * 10)
  .subscribe(i -> {
      receivedTotal += i;
      System.out.println("Received " + i + " on thread "
        + Thread.currentThread().getName());
  });

Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);

Мы видим, что элементы были созданы в основном потоке и были отправлены до первого вызова карты.

Но после этогоObservOn перенаправил обработку в поток вычислений, который использовался при обработке карты и финального подписчика.

Одна проблема, которая может возникнуть при использованииObservOn, заключается в том, что нижний поток может производить выбросы быстрее, чем верхний поток может их обработать. Это может вызвать проблемы с обратным давлением, которые нам, возможно, придется учитывать.

Чтобы указать, с каким планировщиком должен работать Observable, мы можем использовать оператор subscribeOn:

Observable.range(1, 5)
  .map(i -> i * 100)
  .doOnNext(i -> {
      emittedTotal += i;
      System.out.println("Emitting " + i
        + " on thread " + Thread.currentThread().getName());
  })
  .subscribeOn(Schedulers.computation())
  .map(i -> i * 10)
  .subscribe(i -> {
      receivedTotal += i;
      System.out.println("Received " + i + " on thread "
        + Thread.currentThread().getName());
  });

Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);

SubscribeOn инструктирует исходный Observable, какой поток использовать для отправки элементов — только этот поток будет отправлять элементы подписчику. . Его можно разместить в любом месте потока, потому что он влияет только на подписку.

По сути, мы можем использовать только один subscribeOn, но у нас может быть любое количество операторовObservOn. Мы можем легко переключать выбросы с одного потока на другой, используяObservOn.

4. Single и SingleOrDefault

Оператор Single возвращает Observable, который испускает один элемент, испускаемый исходным Observable:

Observable.range(1, 1)
  .single()
  .subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 1);

Если исходный Observable производит ноль или более одного элемента, исключение будет брошенный:

Observable.empty()
  .single()
  .onErrorReturn(e -> receivedTotal += 10)
  .subscribe();
assertTrue(receivedTotal == 10);

С другой стороны, оператор SingleOrDefault очень похож на Single, а это означает, что он также возвращает Observable, который испускает один элемент из источника, но дополнительно мы можем указать значение по умолчанию: ~~ ~

Observable.empty()
  .singleOrDefault("Default")
  .subscribe(i -> result +=i);
assertTrue(result.equals("Default"));

Но если исходный объект Observable генерирует более одного элемента, он все равно выдает исключение IllegalArgumentExeption:

Observable.range(1, 3)
  .singleOrDefault(5)
  .onErrorReturn(e -> receivedTotal += 10)
  .subscribe();
assertTrue(receivedTotal == 10);

S простой вывод:

    Если ожидается, что исходный объект Observable может не содержать ни одного элемента или иметь только один элемент, то Следует использовать SingleOrDefault. Если мы имеем дело с потенциально более чем одним элементом, выдаваемым в нашем Observable, и мы хотим выдать только первое или последнее значение, мы можем использовать другие операторы, такие как first или last

5. Отметка времени ~~ ~»

«Оператор Timestamp прикрепляет метку времени к каждому элементу, испускаемому исходным Observable, перед тем, как повторно передать этот элемент в своей собственной последовательности. Временная метка указывает, в какое время был отправлен элемент:

Observable.range(1, 10)
  .timestamp()
  .map(o -> result = o.getClass().toString() )
  .last()
  .subscribe();
 
assertTrue(result.equals("class rx.schedulers.Timestamped"));

6. Задержка

Этот оператор модифицирует свой исходный Observable, делая паузу на определенный интервал времени перед испусканием каждого из исходных наблюдаемых элементов.

Он смещает всю последовательность, используя предоставленное значение:

Observable source = Observable.interval(1, TimeUnit.SECONDS)
  .take(5)
  .timestamp();

Observable delayedObservable
  = source.delay(2, TimeUnit.SECONDS);

source.subscribe(
  value -> System.out.println("source :" + value),
  t -> System.out.println("source error"),
  () -> System.out.println("source completed"));

delayedObservable.subscribe(
  value -> System.out.println("delay : " + value),
  t -> System.out.println("delay error"),
  () -> System.out.println("delay completed"));
Thread.sleep(8000);

Существует альтернативный оператор, с помощью которого мы можем отложить подписку на исходный Observable, который называется delaySubscription.

Оператор задержки запускается в планировщике вычислений по умолчанию, но мы можем выбрать другой планировщик, передав его в качестве необязательного третьего параметра в delaySubscription.

7. Repeat

Repeat просто перехватывает уведомление о завершении от восходящего потока и вместо того, чтобы передавать его нисходящему потоку, повторно подписывается.

Таким образом, не гарантируется, что повторение будет повторять одну и ту же последовательность событий, но бывает так, что восходящий поток является фиксированным:

Observable.range(1, 3)
  .repeat(3)
  .subscribe(i -> receivedTotal += i);
 
assertTrue(receivedTotal == 18);

8. Кэш

Оператор кэширования стоит между подпиской и нашим кастомным Observable.

Когда появляется первый подписчик, кеш делегирует подписку базовому Observable и пересылает все уведомления (события, завершения или ошибки) вниз по течению.

Однако в то же время он хранит копии всех уведомлений внутри. Когда последующий подписчик хочет получать push-уведомления, кеш больше не делегирует базовый Observable, а вместо этого передает кэшированные значения:

Observable<Integer> source =
  Observable.<Integer>create(subscriber -> {
      System.out.println("Create");
      subscriber.onNext(receivedTotal += 5);
      subscriber.onCompleted();
  }).cache();
source.subscribe(i -> {
  System.out.println("element 1");
  receivedTotal += 1;
});
source.subscribe(i -> {
  System.out.println("element 2");
  receivedTotal += 2;
});
 
assertTrue(receivedTotal == 8);

9. Использование

Когда наблюдатель подписывается на Observable, возвращенный из using(), он будет использовать функцию фабрики Observable для создания объекта Observable, который наблюдатель будет… наблюдать, и в то же время использовать функцию фабрики ресурсов для создания любого ресурса, для которого мы его разработали.

Когда наблюдатель отписывается от Observable или когда Observable завершается, использование вызовет третью функцию для удаления созданного ресурса:

Observable<Character> values = Observable.using(
  () -> "resource",
  r -> {
      return Observable.create(o -> {
          for (Character c : r.toCharArray()) {
              o.onNext(c);
          }
          o.onCompleted();
      });
  },
  r -> System.out.println("Disposed: " + r)
);
values.subscribe(
  v -> result += v,
  e -> result += e
);
assertTrue(result.equals("resource"));

10. Заключение

В этой статье мы говорили, как использовать служебные операторы RxJava, а также узнать, как изучить их наиболее важные функции.

Истинная мощь RxJava заключается в ее операторах. Декларативные преобразования потоков данных безопасны, но выразительны и гибки.

Имея прочную основу функционального программирования, операторы играют решающую роль в принятии RxJava. Освоение встроенных операторов — ключ к успеху в этой библиотеке.

Полный исходный код проекта, включая все используемые здесь примеры кода, можно найти на GitHub.