«1. Обзор

Поведение нескольких подписчиков по умолчанию не всегда желательно. В этой статье мы расскажем, как изменить это поведение и правильно обрабатывать несколько подписчиков.

Но сначала давайте посмотрим на поведение нескольких подписчиков по умолчанию.

2. Поведение по умолчанию

Допустим, у нас есть следующий Observable:

private static Observable getObservable() {
    return Observable.create(subscriber -> {
        subscriber.onNext(gettingValue(1));
        subscriber.onNext(gettingValue(2));

        subscriber.add(Subscriptions.create(() -> {
            LOGGER.info("Clear resources");
        }));
    });
}

Этот объект генерирует два элемента, как только подписчики подписываются.

В нашем примере у нас есть два подписчика:

LOGGER.info("Subscribing");

Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));

s1.unsubscribe();
s2.unsubscribe();

Представьте, что получение каждого элемента является затратной операцией — она может включать, например, интенсивные вычисления или открытие URL-соединения.

Для простоты мы просто вернем число:

private static Integer gettingValue(int i) {
    LOGGER.info("Getting " + i);
    return i;
}

Вот результат:

Subscribing
Getting 1
subscriber#1 is printing 1
Getting 2
subscriber#1 is printing 2
Getting 1
subscriber#2 is printing 1
Getting 2
subscriber#2 is printing 2
Clear resources
Clear resources

Как мы видим, получение каждого элемента, а также очистка ресурсов выполняется дважды по умолчанию. – один раз для каждого Абонента. Это не то, чего мы хотим. Класс ConnectableObservable помогает решить проблему.

3. ConnectableObservable

Класс ConnectableObservable позволяет разделить подписку с несколькими подписчиками и не выполнять базовые операции несколько раз.

Но сначала давайте создадим ConnectableObservable.

3.1. publish()

Метод publish() создает ConnectableObservable из Observable:

ConnectableObservable obs = Observable.create(subscriber -> {
    subscriber.onNext(gettingValue(1));
    subscriber.onNext(gettingValue(2));
    subscriber.add(Subscriptions.create(() -> {
        LOGGER.info("Clear resources");
    }));
}).publish();

Но пока он ничего не делает. Что заставляет его работать, так это метод connect().

3.2. connect()

До тех пор, пока не будет вызван метод connect() ConnectableObservable, обратный вызов onSubscribe() Observable не запускается, даже если есть несколько подписчиков.

Давайте продемонстрируем это:

LOGGER.info("Subscribing");
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));
Thread.sleep(1000);
LOGGER.info("Connecting");
Subscription s = obs.connect();
s.unsubscribe();

Мы подписываемся, а затем ждем секунду перед подключением. Вывод:

Subscribing
Connecting
Getting 1
subscriber #1 is printing 1
subscriber #2 is printing 1
Getting 2
subscriber #1 is printing 2
subscriber #2 is printing 2
Clear resources

Как мы видим:

    Получение элементов происходит только один раз, как мы и хотели. Очистка ресурсов также происходит только один раз. Получение элементов начинается через секунду после подписки. Подписка больше не вызывает испускание элементов. Только connect() делает это

Эта задержка может быть полезной — иногда нам нужно предоставить всем подписчикам одинаковую последовательность элементов, даже если один из них подписывается раньше, чем другой.

3.3. Непротиворечивое представление Observables — connect() After subscribe()

Этот вариант использования не может быть продемонстрирован на нашем предыдущем Observable, так как он работает в холодном состоянии, и оба подписчика все равно получают всю последовательность элементов.

Вместо этого представьте, что генерация элемента не зависит от момента подписки, например, события, генерируемые щелчком мыши. Теперь также представьте, что второй подписчик подписывается через секунду после первого.

Первый подписчик получит все элементы, испускаемые в этом примере, тогда как второй подписчик получит только некоторые элементы.

С другой стороны, использование метода connect() в нужном месте может дать обоим подписчикам одинаковое представление об наблюдаемой последовательности.

Пример Hot Observable

Давайте создадим Hot Observable. Он будет испускать элементы при щелчке мышью на JFrame.

Каждый элемент будет x-координатой клика:

private static Observable getObservable() {
    return Observable.create(subscriber -> {
        frame.addMouseListener(new MouseAdapter() {
            @Override
            public void mouseClicked(MouseEvent e) {
                subscriber.onNext(e.getX());
            }
        });
        subscriber.add(Subscriptions.create(() {
            LOGGER.info("Clear resources");
            for (MouseListener listener : frame.getListeners(MouseListener.class)) {
                frame.removeMouseListener(listener);
            }
        }));
    });
}

Поведение Hot Observable по умолчанию

Теперь если мы подпишем двух подписчиков друг за другом с интервалом в секунду, запустим программу и начнем кликать , мы увидим, что первый подписчик получит больше элементов: ConnectableObservable и вызовите connect() после подписки обоих подписчиков:

public static void defaultBehaviour() throws InterruptedException {
    Observable obs = getObservable();

    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}
subscribing #1
subscriber#1 is printing x-coordinate 280
subscriber#1 is printing x-coordinate 242
subscribing #2
subscriber#1 is printing x-coordinate 343
subscriber#2 is printing x-coordinate 343
unsubscribe#1
clearing resources
unsubscribe#2
clearing resources

Теперь они получат одинаковую последовательность:

Итак, смысл в том, чтобы дождаться момента, когда все подписчики будут готовы и затем вызовите соединение().

public static void subscribeBeforeConnect() throws InterruptedException {

    ConnectableObservable obs = getObservable().publish();

    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(
      i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(
      i ->  LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe connected");
    s.unsubscribe();
}

В приложении Spring мы можем подписаться на все компоненты во время запуска приложения, например, и вызвать connect() в onApplicationEvent().

subscribing #1
subscribing #2
connecting:
subscriber#1 is printing x-coordinate 317
subscriber#2 is printing x-coordinate 317
subscriber#1 is printing x-coordinate 364
subscriber#2 is printing x-coordinate 364
unsubscribe connected
clearing resources

Но вернемся к нашему примеру; обратите внимание, что все клики перед методом connect() пропускаются. Если мы не хотим пропускать элементы, а наоборот обрабатывать их, мы можем поставить connect() раньше в коде и заставить Observable генерировать события в отсутствие какого-либо подписчика.

«3.4. Принудительная подписка в отсутствие какого-либо подписчика – connect() Перед подпиской()

Чтобы продемонстрировать это, давайте исправим наш пример:

Шаги относительно просты:

Во-первых, мы connect Затем мы ждем одну секунду и подписываемся на первого подписчика Наконец, мы ждем еще секунду и подписываемся на второго подписчика

public static void connectBeforeSubscribe() throws InterruptedException {
    ConnectableObservable obs = getObservable()
      .doOnNext(x -> LOGGER.info("saving " + x)).publish();
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    s.unsubscribe();
}

Обратите внимание, что мы добавили оператор doOnNext(). Например, здесь мы могли бы хранить элементы в базе данных, но в нашем коде мы просто печатаем «сохранение…».

    Если мы запустим код и начнем щелкать, мы увидим, что элементы создаются и обрабатываются сразу после вызова connect():

Если бы не было подписчиков, элементы все равно обрабатывались бы.

Таким образом, метод connect() начинает излучать и обрабатывать элементы независимо от того, подписан ли кто-то, как если бы существовал искусственный подписчик с пустым действием, которое использовало элементы.

connecting:
saving 306
saving 248
subscribing #1
saving 377
subscriber#1 is printing x-coordinate 377
saving 295
subscriber#1 is printing x-coordinate 295
saving 206
subscriber#1 is printing x-coordinate 206
subscribing #2
saving 347
subscriber#1 is printing x-coordinate 347
subscriber#2 is printing x-coordinate 347
clearing resources

А если подписываются какие-то настоящие подписчики, то этот искусственный посредник просто распространяет им элементы.

Для отписки искусственного Подписчика выполняем:

Где:

3.5. autoConnect()

s.unsubscribe();

Этот метод подразумевает, что connect() вызывается не до или после подписки, а автоматически при подписке первого подписчика.

Subscription s = obs.connect();

Используя этот метод, мы не можем сами вызвать connect(), так как возвращаемый объект является обычным Observable, который не имеет этого метода, но использует базовый ConnectableObservable:

Обратите внимание, что мы не можем также отписать искусственного Подписчика. Мы можем отписать всех настоящих подписчиков, но искусственный подписчик все равно будет обрабатывать события.

Чтобы понять это, давайте посмотрим, что происходит в конце после того, как последний подписчик отписался:

public static void autoConnectAndSubscribe() throws InterruptedException {
    Observable obs = getObservable()
    .doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect();

    LOGGER.info("autoconnect()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription s1 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription s2 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));

    Thread.sleep(1000);
    LOGGER.info("unsubscribe 1");
    s1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe 2");
    s2.unsubscribe();
}

Как мы видим, очистка ресурсов не происходит, а сохранение элементов с помощью doOnNext() продолжается после второго отписка. Это означает, что искусственный подписчик не отписывается, а продолжает потреблять элементы.

3.6. refCount()

subscribing #1
saving 296
subscriber#1 is printing x-coordinate 296
saving 329
subscriber#1 is printing x-coordinate 329
subscribing #2
saving 226
subscriber#1 is printing x-coordinate 226
subscriber#2 is printing x-coordinate 226
unsubscribe 1
saving 268
subscriber#2 is printing x-coordinate 268
saving 234
subscriber#2 is printing x-coordinate 234
unsubscribe 2
saving 278
saving 268

refCount() аналогичен autoConnect() в том смысле, что соединение также происходит автоматически, как только подписывается первый подписчик.

В отличие от autoconnect() отключение также происходит автоматически, когда последний подписчик отказывается от подписки:

4. Заключение

public static void refCountAndSubscribe() throws InterruptedException {
    Observable obs = getObservable()
      .doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount();

    LOGGER.info("refcount()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(
      i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(
      i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));

    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}
refcount()
subscribing #1
saving 265
subscriber#1 is printing x-coordinate 265
saving 338
subscriber#1 is printing x-coordinate 338
subscribing #2
saving 203
subscriber#1 is printing x-coordinate 203
subscriber#2 is printing x-coordinate 203
unsubscribe#1
saving 294
subscriber#2 is printing x-coordinate 294
unsubscribe#2
clearing resources

Класс ConnectableObservable помогает обрабатывать несколько подписчиков без особых усилий.

Его методы выглядят одинаково, но сильно меняют поведение подписчиков из-за тонкостей реализации, означающих, что даже порядок методов имеет значение.

Полный исходный код всех примеров, использованных в этой статье, можно найти в проекте GitHub.

«