«1. Обзор

В этом уроке мы собираемся узнать о хуках RxJava. Мы создадим короткие примеры, чтобы продемонстрировать, как хуки работают в разных ситуациях.

2. Что такое хуки RxJava?

Как видно из названия, хуки RxJava позволяют нам подключиться к жизненному циклу Observable, Completable, Maybe, Flowable и Single. Кроме того, RxJava позволяет нам добавлять хуки жизненного цикла к планировщикам, возвращаемым планировщиками. Кроме того, мы можем указать глобальный обработчик ошибок, также используя хуки.

В RxJava 1 класс RxJavaHooks используется для определения хуков. Но механизм перехвата полностью переписан в RxJava 2. Теперь класс RxJavaHooks больше не доступен для определения перехватчиков. Вместо этого мы должны использовать RxJavaPlugins для реализации хуков жизненного цикла.

Класс RxJavaPlugins имеет несколько методов установки для установки ловушек. Эти крючки являются глобальными. После того, как они установлены, мы должны либо вызвать метод reset() класса RxJavaPlugins, либо вызвать метод установки для отдельного хука, чтобы удалить его.

3. Ловушка для обработки ошибок

Мы можем использовать метод setErrorHandler() для обработки ошибок, которые не могут быть сгенерированы, поскольку жизненный цикл нисходящего потока уже достиг конечного состояния. Давайте посмотрим, как мы можем реализовать обработчик ошибок и протестировать его:

RxJavaPlugins.setErrorHandler(throwable -> {
    hookCalled = true;
});

Observable.error(new IllegalStateException()).subscribe();

assertTrue(hookCalled);

Не все исключения генерируются как есть. Однако RxJava проверит, является ли выданная ошибка одним из уже названных случаев ошибки, которые должны пройти как есть, в противном случае она будет заключена в исключение UndeliverableException. Исключения, называемые ошибками:

    OnErrorNotImplementedException — когда пользователь забывает добавить обработчик onError в метод subscribe(). происходят нарушения NullPointerException — стандартное исключение нулевого указателя IllegalArgumentException — из-за недопустимого пользовательского ввода CompositeException — из-за сбоя при обработке исключения

4. Хуки для Completable

RxJava Completable имеет две ловушки жизненного цикла. Давайте посмотрим на них сейчас.

4.1. setOnCompletableAssembly

RxJava будет вызывать этот хук при создании экземпляров операторов и источников в Completable. Мы можем использовать текущий объект Completable, переданный в качестве аргумента функции ловушки, для любой операции с ним:

RxJavaPlugins.setOnCompletableAssembly(completable -> {
    hookCalled = true;
    return completable;
});

Completable.fromSingle(Single.just(1));

assertTrue(hookCalled);

4.2. setOnCompletableSubscribe

RxJava вызывает этот хук до того, как подписчик подпишется на Completable:

RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
    hookCalled = true;
    return observer;
});

Completable.fromSingle(Single.just(1)).test();

assertTrue(hookCalled);

5. Хуки для Observable

Далее давайте рассмотрим три хука жизненного цикла RxJava для Observable.

5.1. setOnObservableAssembly

RxJava вызывает этот хук, когда создает экземпляры операторов и источников в Observable:

RxJavaPlugins.setOnObservableAssembly(observable -> {
    hookCalled = true;
    return observable;
});

Observable.range(1, 10);

assertTrue(hookCalled);

5.2. setOnObservableSubscribe

RxJava вызывает этот хук до того, как подписчик подпишется на Observable:

RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
    hookCalled = true;
    return observer;
});

Observable.range(1, 10).test();

assertTrue(hookCalled);

5.3. setOnConnectableObservableAssembly

Этот хук предназначен для ConnectableObservable. ConnectableObservable — это вариант самого Observable. Единственная разница в том, что он не начинает генерировать элементы, когда на него подписаны, а только когда вызывается его метод connect():

RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
    hookCalled = true;
    return connectableObservable;
});

ConnectableObservable.range(1, 10).publish().connect();

assertTrue(hookCalled);

6. Хуки для Flowable

Теперь давайте взглянем на хуки жизненного цикла, определенные для Flowable.

6.1. setOnFlowableAssembly

RxJava вызывает этот хук, когда создает экземпляры операторов и источников в Flowable:

RxJavaPlugins.setOnFlowableAssembly(flowable -> {
    hookCalled = true;
    return flowable;
});

Flowable.range(1, 10);

assertTrue(hookCalled);

6.2. setOnFlowableSubscribe

RxJava вызывает этот хук до того, как подписчик подпишется на Flowable:

RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
    hookCalled = true;
    return observer;
});

Flowable.range(1, 10).test();

assertTrue(hookCalled);

6.3. setOnConnectableFlowableAssembly

RxJava вызывает этот хук, когда создает экземпляры операторов и источников в ConnectableFlowable. Как и ConnectableObservable, ConnectableFlowable также начинает выдавать элементы только тогда, когда мы вызываем его метод connect():

RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
    hookCalled = true;
    return connectableFlowable;
});

ConnectableFlowable.range(1, 10).publish().connect();

assertTrue(hookCalled);

6.4. setOnParallelAssembly

ParallelFlowable предназначен для достижения параллелизма между несколькими издателями. RxJava вызывает хук setOnParallelAssembly() при создании экземпляров операторов и источников в ParallelFlowable:

RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
    hookCalled = true;
    return parallelFlowable;
});

Flowable.range(1, 10).parallel();

assertTrue(hookCalled);

7. Хуки для Maybe

«Эмиттер Maybe имеет два хука, определенных для управления его жизненным циклом.

7.1. setOnMaybeAssembly

RxJava вызывает этот хук, когда создает экземпляры операторов и источников на Maybe:

RxJavaPlugins.setOnMaybeAssembly(maybe -> {
    hookCalled = true;
    return maybe;
});

Maybe.just(1);

assertTrue(hookCalled);

7.2. setOnMaybeSubscribe

RxJava вызывает этот хук до того, как подписчик подпишется на Maybe:

RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
    hookCalled = true;
    return observer;
});

Maybe.just(1).test();

assertTrue(hookCalled);

8. Хуки для Single

RxJava также определяет два основных хука для Single излучателя.

8.1. setOnSingleAssembly

RxJava вызывает этот хук, когда создает экземпляры операторов и источников в Single:

RxJavaPlugins.setOnSingleAssembly(single -> {
    hookCalled = true;
    return single;
});

Single.just(1);

assertTrue(hookCalled);

8.2. setOnSingleSubscribe

RxJava вызывает этот хук до того, как подписчик подпишется на Single:

RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
    hookCalled = true;
    return observer;
});

Single.just(1).test();

assertTrue(hookCalled);

9. Хуки для планировщиков

Как и эмиттеры RxJava, планировщики также имеют набор хуков для управления их жизненным циклом. RxJava определяет общий хук, который вызывается, когда мы используем планировщик любого типа. Кроме того, можно реализовать хуки, специфичные для разных планировщиков.

9.1. setScheduleHandler

RxJava вызывает этот хук, когда мы используем любой из планировщиков для операции:

RxJavaPlugins.setScheduleHandler((runnable) -> {
    hookCalled = true;
    return runnable;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.single())
  .test();

hookCalled = false;

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.computation())
  .test();

assertTrue(hookCalled);

Так как мы повторили операцию с планировщиками single() и computing(), когда мы запускаем это, тест case дважды напечатает сообщение в консоли.

9.2. Ловушки для планировщика вычислений

Планировщик вычислений имеет две ловушки, а именно, setInitComputationSchedulerHandler и setComputationSchedulerHandler.

Когда RxJava инициализирует планировщик вычислений, он вызывает ловушку, которую мы установили с помощью функции setInitComputationSchedulerHandler. Более того, он вызывает хук, который мы установили с помощью функции setComputationSchedulerHandler, когда мы планируем задачу с помощью Schedulers.computation():

RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.computation())
  .test();

assertTrue(hookCalled && initHookCalled);

9.3. Перехватчики для планировщика ввода-вывода

Планировщик ввода-вывода также имеет два перехватчика, а именно, setInitIoSchedulerHandler и setIoSchedulerHandler:

RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.io())
  .test();

assertTrue(hookCalled && initHookCalled);

9.4. Хуки для одиночного планировщика

Теперь давайте посмотрим на хуки для одиночного планировщика:

RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.single())
  .test();

assertTrue(hookCalled && initHookCalled);

9.5. Хуки для планировщика NewThread

Как и другие планировщики, планировщик NewThread также определяет два хука:

RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 15)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.newThread())
  .test();

assertTrue(hookCalled && initHookCalled);

10. Заключение

В этом руководстве мы узнали, что такое различные хуки жизненного цикла RxJava и как мы можем их реализовать. их. Среди этих хуков наиболее примечательным является хук обработки ошибок. Но мы можем использовать другие для целей аудита, например, для регистрации количества подписчиков и других конкретных случаев использования.

И, как обычно, все короткие примеры, которые мы здесь обсуждали, можно найти на GitHub.