«1. Введение

После введения в RxJava мы рассмотрим операторы фильтрации.

В частности, мы сосредоточимся на фильтрации, пропуске, временной фильтрации и некоторых более сложных операциях фильтрации.

2. Фильтрация

При работе с Observable иногда полезно выбрать только подмножество испускаемых элементов. Для этой цели RxJava предлагает различные возможности фильтрации.

Давайте начнем с метода фильтра.

2.1. Оператор фильтра

Проще говоря, оператор фильтра фильтрует Observable, гарантируя, что испускаемые элементы соответствуют указанному условию, которое приходит в форме предиката.

Давайте посмотрим, как мы можем отфильтровать только нечетные значения из выдаваемых:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .filter(i -> i % 2 != 0);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 3, 5, 7, 9);

2.2. Оператор взятия

При фильтрации с помощью взятия логика приводит к выдаче первых n элементов при игнорировании остальных элементов.

Давайте посмотрим, как мы можем фильтровать sourceObservable и выдавать только первые два элемента:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.take(3);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3);

2.3. Оператор takeWhile

При использовании takeWhile отфильтрованный Observable будет продолжать испускать элементы, пока не встретит первый элемент, который не соответствует предикату.

Давайте посмотрим, как мы можем использовать takeWhile — с предикатом фильтрации:

Observable<Integer> sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .takeWhile(i -> i < 4);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3);

2.4. Оператор takeFirst

Всякий раз, когда мы хотим выдать только первый элемент, соответствующий заданному условию, мы можем использовать takeFirst().

Давайте быстро посмотрим, как мы можем выдать первый элемент, который больше 5:

Observable<Integer> sourceObservable = Observable
  .just(1, 2, 3, 4, 5, 7, 6);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .takeFirst(x -> x > 5);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(7);

2.5. Операторы first и firstOrDefault

Аналогичного поведения можно добиться с помощью первого API:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.first();

filteredObservable.subscribe(subscriber);

subscriber.assertValue(1);

Однако, если мы хотим указать значение по умолчанию, если элементы не создаются, мы можем использовать firstOrDefault:

Observable<Integer> sourceObservable = Observable.empty();

Observable<Integer> filteredObservable = sourceObservable.firstOrDefault(-1);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(-1);

2.6. Оператор takeLast

Далее, если мы хотим сгенерировать только последние n элементов, сгенерированных Observable, мы можем использовать takeLast.

Давайте посмотрим, как можно выдать только последние три элемента:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.takeLast(3);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(8, 9, 10);

Мы должны помнить, что это задерживает выдачу любого элемента из исходного Observable до тех пор, пока он не завершится.

2.7. last и lastOrDefault

Если мы хотим вывести только последний элемент, кроме использования takeLast(1), мы можем использовать last.

Это фильтрует Observable, выдавая только последний элемент, который дополнительно проверяет предикат фильтрации:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .last(i -> i % 2 != 0);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(9);

В случае, если Observable пуст, мы можем использовать lastOrDefault, который фильтрует Observable, выдавая значение по умолчанию.

Значение по умолчанию также выдается, если используется оператор lastOrDefault и нет элементов, проверяющих условие фильтрации:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = 
  sourceObservable.lastOrDefault(-1, i -> i > 10);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(-1);

2.8. Операторы elementAt и elementAtOrDefault

С помощью оператора elementAt мы можем выбрать один элемент, созданный исходным Observable, указав его индекс:

Observable<Integer> sourceObservable = Observable
  .just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.elementAt(4);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(7);

Однако elementAt выдаст исключение IndexOutOfBoundException, если указанный индекс превышает количество элементов. излучаемый.

Чтобы избежать этой ситуации, можно использовать elementAtOrDefault, который будет возвращать значение по умолчанию, если индекс выходит за пределы допустимого диапазона:

Observable<Integer> sourceObservable = Observable
  .just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable
 = sourceObservable.elementAtOrDefault(7, -1);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(-1);

2.9. Оператор ofType

Всякий раз, когда Observable испускает элементы Object, их можно фильтровать на основе их типа.

Давайте посмотрим, как мы можем фильтровать только испускаемые элементы типа String:

Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11);
TestSubscriber subscriber = new TestSubscriber();

Observable filteredObservable = sourceObservable.ofType(String.class);

filteredObservable.subscribe(subscriber);

subscriber.assertValues("two", "five");

3. Пропуск

С другой стороны, когда мы хотим отфильтровать или пропустить некоторые элементы, испускаемые Observable, RxJava предлагает несколько операторов в качестве аналога операторов фильтрации, которые мы обсуждали ранее.

Давайте начнем с оператора skip, аналога take.

3.1. Оператор skip

Когда Observable выдает последовательность элементов, можно отфильтровать или пропустить некоторые из первых выданных элементов с помощью skip.

Например. давайте посмотрим, как можно пропустить первые четыре элемента:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.skip(4);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(5, 6, 7, 8, 9, 10);

3.2. Оператор skipWhile

Всякий раз, когда мы хотим отфильтровать все первые значения, выдаваемые Observable, которые не соответствуют предикату фильтрации, мы можем использовать оператор skipWhile:

Observable<Integer> sourceObservable = Observable
  .just(1, 2, 3, 4, 5, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .skipWhile(i -> i < 4);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(4, 5, 4, 3, 2, 1);

3.3. Оператор skipLast

«Оператор skipLast позволяет нам пропускать конечные элементы, испускаемые Observable, принимая только те, которые были испущены до них.

При этом мы можем, например, пропустить последние пять пунктов:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.skipLast(5);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3, 4, 5);

3.4. Отдельные и отличные операторы UntilChanged

Оператор Different возвращает Observable, который испускает все элементы, испускаемые sourceObservable, которые являются различными:

Observable<Integer> sourceObservable = Observable
  .just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> distinctObservable = sourceObservable.distinct();

distinctObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3);

Однако, если мы хотим получить Observable, который испускает все элементы, испускаемые sourceObservable которые отличаются от своих непосредственных предшественников, мы можем использовать оператор DifferentUntilChanged:

Observable<Integer> sourceObservable = Observable
  .just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> distinctObservable = sourceObservable.distinctUntilChanged();

distinctObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 1, 3, 1);

3.5. Оператор ignoreElements

Всякий раз, когда мы хотим игнорировать все элементы, испускаемые sourceObservable, мы можем просто использовать ignoreElements:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> ignoredObservable = sourceObservable.ignoreElements();

ignoredObservable.subscribe(subscriber);

subscriber.assertNoValues();

4. Операторы фильтрации времени

При работе с наблюдаемой последовательностью ось времени неизвестно, но иногда может быть полезно получить своевременные данные из последовательности.

С этой целью RxJava предлагает несколько методов, которые позволяют нам работать с Observable, используя также ось времени.

Прежде чем перейти к первому, давайте определим Observable с синхронизацией, который будет генерировать элемент каждую секунду:

TestScheduler testScheduler = new TestScheduler();

Observable<Integer> timedObservable = Observable
  .just(1, 2, 3, 4, 5, 6)
  .zipWith(Observable.interval(
    0, 1, TimeUnit.SECONDS, testScheduler), (item, time) -> item);

TestScheduler — это специальный планировщик, который позволяет вручную переводить часы в любом темпе, который мы предпочитаем.

4.1. Операторы Sample и ThrottleLast

Оператор Sample фильтрует timedObservable, возвращая Observable, который выдает самые последние элементы, выдаваемые этим API в течение временных интервалов периода.

Давайте посмотрим, как мы можем попробовать timedObservable, фильтруя только последний испускаемый элемент каждые 2,5 секунды:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> sampledObservable = timedObservable
  .sample(2500L, TimeUnit.MILLISECONDS, testScheduler);

sampledObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(3, 5, 6);

Такого поведения можно добиться также с помощью оператора ThrottleLast.

4.2. Оператор ThrottleFirst

Оператор ThrottleFirst отличается от threadLast/sample тем, что он выдает первый элемент, выдаваемый timedObservable в каждом периоде выборки, а не самый последний выданный элемент.

Давайте посмотрим, как мы можем выдать первые элементы, используя период выборки 4 секунды:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .throttleFirst(4100L, TimeUnit.SECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(1, 6);

4.3. Операторы debounce и ThrottleWithTimeout

С помощью оператора debounce можно выдать элемент только в том случае, если в течение определенного промежутка времени не был выдан другой элемент.

Поэтому, если мы выберем временной интервал, превышающий временной интервал между испускаемыми элементами timedObservable, он будет испускать только последний. С другой стороны, если он меньше, он будет испускать все элементы, испускаемые timedObservable.

Давайте посмотрим, что происходит в первом сценарии:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .debounce(2000L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValue(6);

Такого поведения также можно добиться, используя дроссельWithTimeout.

4.4. Оператор тайм-аута

Оператор тайм-аута отражает исходный Observable, но выдает ошибку уведомления, прерывая выпуск элементов, если исходный Observable не может выдать какие-либо элементы в течение указанного интервала времени.

Давайте посмотрим, что произойдет, если мы укажем тайм-аут в 500 миллисекунд для нашего timedObservable:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .timeout(500L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertError(TimeoutException.class); subscriber.assertValues(1);

5. Множественная фильтрация Observable

При работе с Observable определенно можно решить, следует ли фильтровать или пропускать элементы на основе второй наблюдаемый.

Прежде чем двигаться дальше, давайте определим delayedObservable, который будет выдавать только 1 элемент через 3 секунды:

Observable<Integer> delayedObservable = Observable.just(1)
  .delay(3, TimeUnit.SECONDS, testScheduler);

Начнем с оператора takeUntil.

5.1. Оператор takeUntil

Оператор takeUntil отбрасывает любой элемент, испускаемый исходным Observable (timedObservable) после того, как второй Observable (delayedObservable) испускает элемент или завершает работу:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .skipUntil(delayedObservable);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(4, 5, 6);

5.2. Оператор skipUntil

С другой стороны, skipUntil отбрасывает любой элемент, созданный исходным Observable (timedObservable), до тех пор, пока второй Observable (delayedObservable) не создаст элемент:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .takeUntil(delayedObservable);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(1, 2, 3);

6. Заключение

В этом подробном руководстве , мы изучили различные операторы фильтрации, доступные в RxJava, и предоставили простой пример каждого из них.

Как всегда, все примеры кода в этой статье можно найти на GitHub.