«1. Введение

Популярность RxJava привела к созданию множества сторонних библиотек, расширяющих ее функциональность.

Многие из этих библиотек были ответом на типичные проблемы, с которыми сталкивались разработчики при использовании RxJava. RxRelay — одно из таких решений.

2. Работа с субъектом

Проще говоря, субъект действует как мост между наблюдаемым и наблюдателем. Поскольку это Observer, он может подписаться на один или несколько Observable и получать от них события.

Кроме того, учитывая, что он одновременно является Observable, он может повторно отправлять события или отправлять новые события своим подписчикам. Более подробную информацию о предмете можно найти в этой статье.

Одна из проблем с Subject заключается в том, что после получения onComplete() или onError() он больше не может перемещать данные. Иногда это желаемое поведение, а иногда нет.

В тех случаях, когда такое поведение нежелательно, следует рассмотреть возможность использования RxRelay.

3. Relay

Relay — это, по сути, Subject, но без возможности вызывать onComplete() и onError(), поэтому он может постоянно выдавать данные.

Это позволяет нам создавать мосты между различными типами API, не беспокоясь о случайном срабатывании состояния терминала.

Чтобы использовать RxRelay, нам нужно добавить в наш проект следующую зависимость:

<dependency>
  <groupId>com.jakewharton.rxrelay2</groupId>
  <artifactId>rxrelay</artifactId>
  <version>1.2.0</version>
</dependency>

4. Типы реле

В библиотеке доступны три различных типа реле. Мы быстро рассмотрим все три здесь.

4.1. PublishRelay

Ретранслятор этого типа будет повторно отправлять все события после того, как наблюдатель подпишется на него.

События будут отправлены всем подписчикам:

public void whenObserverSubscribedToPublishRelay_itReceivesEmittedEvents() {
    PublishRelay<Integer> publishRelay = PublishRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    
    publishRelay.subscribe(firstObserver);
    firstObserver.assertSubscribed();
    publishRelay.accept(5);
    publishRelay.accept(10);
    publishRelay.subscribe(secondObserver);
    secondObserver.assertSubscribed();
    publishRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    
    // second receives only the last event
    secondObserver.assertValue(15);
}

В этом случае нет буферизации событий, поэтому такое поведение похоже на холодный Observable.

4.2. BehaviorRelay

Этот тип Relay будет повторно передавать самое последнее наблюдаемое событие и все последующие события после того, как Observer подпишется:

public void whenObserverSubscribedToBehaviorRelay_itReceivesEmittedEvents() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    behaviorRelay.accept(5);     
    behaviorRelay.subscribe(firstObserver);
    behaviorRelay.accept(10);
    behaviorRelay.subscribe(secondObserver);
    behaviorRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(10, 15);
}

Когда мы создаем BehaviorRelay, мы можем указать значение по умолчанию, которое будет отправлено, если нет других событий для генерации.

Чтобы указать значение по умолчанию, мы можем использовать метод createDefault():

public void whenObserverSubscribedToBehaviorRelay_itReceivesDefaultValue() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertValue(1);
}

Если мы не хотим указывать значение по умолчанию, мы можем использовать метод create():

public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue_itIsEmpty() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}

4.3 . ReplayRelay

Этот тип Relay буферизует все полученные им события, а затем повторно передает их всем подписчикам, подписавшимся на него:

 public void whenObserverSubscribedToReplayRelay_itReceivesEmittedEvents() {
    ReplayRelay<Integer> replayRelay = ReplayRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    replayRelay.subscribe(firstObserver);
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.subscribe(secondObserver);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(5, 10, 15);
}

Все элементы буферизуются, и все подписчики будут получать одни и те же события, так что это поведение аналогично к холодному Наблюдаемому.

Когда мы создаем ReplayRelay, мы можем указать максимальный размер буфера и время жизни для событий.

Чтобы создать Relay с ограниченным размером буфера, мы можем использовать метод createWithSize(). Когда количество событий, подлежащих буферизации, превышает установленный размер буфера, предыдущие элементы будут отброшены:

public void whenObserverSubscribedToReplayRelayWithLimitedSize_itReceivesEmittedEvents() {
    ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
    TestObserver<Integer> firstObserver = TestObserver.create();
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.accept(20);
    replayRelay.subscribe(firstObserver);
    firstObserver.assertValues(15, 20);
}

Мы также можем создать ReplayRelay с максимальным временем ожидания для буферизованных событий, используя метод createWithTime():

public void whenObserverSubscribedToReplayRelayWithMaxAge_thenItReceivesEmittedEvents() {
    SingleScheduler scheduler = new SingleScheduler();
    ReplayRelay<Integer> replayRelay =
      ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, scheduler);
    long current =  scheduler.now(TimeUnit.MILLISECONDS);
    TestObserver<Integer> firstObserver = TestObserver.create();
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.accept(20);
    Thread.sleep(3000);
    replayRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}

5. Custom Relay

Все типы, описанные выше, расширяют общий абстрактный класс Relay, это дает нам возможность написать собственный класс Relay.

Чтобы создать собственное реле, нам нужно реализовать три метода: accept(), hasObservers() и subscribeActual().

Давайте напишем простую ретрансляцию, которая будет пересылать событие одному из случайно выбранных подписчиков:

public class RandomRelay extends Relay<Integer> {
    Random random = new Random();

    List<Observer<? super Integer>> observers = new ArrayList<>();

    @Override
    public void accept(Integer integer) {
        int observerIndex = random.nextInt() % observers.size();
        observers.get(observerIndex).onNext(integer);
    }

    @Override
    public boolean hasObservers() {
        return observers.isEmpty();
    }

    @Override
    protected void subscribeActual(Observer<? super Integer> observer) {
        observers.add(observer);
        observer.onSubscribe(Disposables.fromRunnable(
          () -> System.out.println("Disposed")));
    }
}

Теперь мы можем проверить, что только один подписчик получит событие:

public void whenTwoObserversSubscribedToRandomRelay_thenOnlyOneReceivesEvent() {
    RandomRelay randomRelay = new RandomRelay();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    randomRelay.subscribe(firstObserver);
    randomRelay.subscribe(secondObserver);
    randomRelay.accept(5);
    if(firstObserver.values().isEmpty()) {
        secondObserver.assertValue(5);
    } else {
        firstObserver.assertValue(5);
        secondObserver.assertEmpty();
    }
}

6. Заключение ~ ~~ В этом уроке мы рассмотрели RxRelay, тип, похожий на Subject, но без возможности активировать терминальное состояние.

Дополнительную информацию можно найти в документации. И, как всегда, все образцы кода можно найти на GitHub.

«