«1. Обзор

В этом руководстве мы узнаем, как преобразовать традиционные синхронные и асинхронные API в Observables с помощью операторов RxJava2.

Мы создадим несколько простых функций, которые помогут нам подробно обсудить эти операторы.

2. Зависимости Maven

Во-первых, мы должны добавить RxJava2 и RxJava2Extensions в качестве зависимостей Maven:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.2</version>
</dependency>
<dependency>
    <groupId>com.github.akarnokd</groupId>
    <artifactId>rxjava2-extensions</artifactId>
    <version>0.20.4</version>
</dependency>

3. Операторы

RxJava2 определяет множество операторов для различных вариантов использования реактивного программирования .

Но мы обсудим только несколько операторов, которые обычно используются для преобразования синхронных или асинхронных методов в Observable в зависимости от их природы. Эти операторы принимают функции в качестве аргументов и выдают значение, возвращаемое этой функцией.

Наряду с обычными операторами, RxJava2 определяет еще несколько операторов для расширенных функций.

Давайте рассмотрим, как мы можем использовать эти операторы для преобразования синхронных и асинхронных методов.

4. Синхронное преобразование методов

4.1. Использование fromCallable()

Этот оператор возвращает Observable, который, когда подписчик подписывается на него, вызывает функцию, переданную в качестве аргумента, а затем выдает значение, возвращенное этой функцией. Давайте создадим функцию, которая возвращает целое число, и преобразуем ее:

AtomicInteger counter = new AtomicInteger();
Callable<Integer> callable = () -> counter.incrementAndGet();

Теперь давайте превратим ее в Observable и протестируем, подписавшись на нее:

Observable<Integer> source = Observable.fromCallable(callable);

for (int i = 1; i < 5; i++) {
    source.test()
      .awaitDone(5, TimeUnit.SECONDS)
      .assertResult(i);
    assertEquals(i, counter.get());
}

Оператор fromCallable() лениво выполняет указанную функцию каждый раз, когда обернутый Observable подписывается. Чтобы проверить это поведение, мы создали несколько подписчиков с помощью цикла.

Так как реактивные потоки по умолчанию асинхронны, подписчик вернется немедленно. В большинстве практических сценариев вызываемая функция будет иметь некоторую задержку для завершения своего выполнения. Итак, мы добавили максимальное время ожидания в пять секунд перед проверкой результата нашей вызываемой функции.

Также обратите внимание, что мы использовали метод test() Observable. Этот метод удобен при тестировании Observables. Он создает TestObserver и подписывается на наш Observable.

4.2. Использование start()

Оператор start() является частью модуля RxJava2Extension. Он вызовет указанную функцию асинхронно и вернет Observable, который выдает результат:

Observable<Integer> source = AsyncObservable.start(callable);

for (int i = 1; i < 5; i++) {
    source.test()
      .awaitDone(5, TimeUnit.SECONDS)
      .assertResult(1);
    assertEquals(1, counter.get());
}

Функция вызывается немедленно, а не всякий раз, когда подписчик подписывается на результирующий Observable. Несколько подписок на этот наблюдаемый наблюдают одно и то же возвращаемое значение.

5. Преобразование асинхронных методов

5.1. Использование fromFuture()

Как мы знаем, наиболее распространенным способом создания асинхронного метода в Java является использование реализации Future. Метод fromFuture принимает Future в качестве аргумента и выдает значение, полученное из метода Future.get().

Во-первых, давайте сделаем функцию, которую мы создали ранее, асинхронной:

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(callable);

Далее, давайте проверим ее преобразованием:

Observable<Integer> source = Observable.fromFuture(future);

for (int i = 1; i < 5; i++) {
    source.test()
      .awaitDone(5, TimeUnit.SECONDS)
      .assertResult(1);
    assertEquals(1, counter.get());
}
executor.shutdown();

И еще раз заметим, что каждая подписка наблюдает один и тот же результат ценность.

Теперь метод dispose() в Observable действительно полезен, когда речь идет о предотвращении утечки памяти. Но в этом случае это не отменит future из-за блокирующего характера Future.get().

Итак, мы можем гарантировать отмену будущего, объединив функцию doOnDispose() исходного наблюдаемого объекта и метод отмены для будущего:

source.doOnDispose(() -> future.cancel(true));

5.2. Использование startFuture()

Как следует из названия, этот оператор немедленно запустит указанное Future и выдаст возвращаемое значение, когда подписчик подпишется на него. В отличие от оператора fromFuture, который кэширует результат для следующего использования, этот оператор будет выполнять асинхронный метод каждый раз, когда на него подписывается:

ExecutorService executor = Executors.newSingleThreadExecutor();
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));

for (int i = 1; i < 5; i++) {
    source.test()
      .awaitDone(5, TimeUnit.SECONDS)
      .assertResult(i);
    assertEquals(i, counter.get());
}
executor.shutdown();

5.3. Использование deferFuture()

Этот оператор объединяет несколько Observable, возвращаемых методом Future, и возвращает поток возвращаемых значений, полученных от каждого Observable. Это запустит переданную асинхронную фабричную функцию всякий раз, когда подписывается новый подписчик.

Итак, давайте сначала создадим асинхронную фабричную функцию:

List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(), 
  counter.incrementAndGet(), counter.incrementAndGet() });
ExecutorService exec = Executors.newSingleThreadExecutor();
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);

А затем мы можем провести быстрый тест:

Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
for (int i = 1; i < 4; i++) {
    source.test()
      .awaitDone(5, TimeUnit.SECONDS)
      .assertResult(1,2,3);
}
exec.shutdown();

«

«6. Заключение

В этом уроке мы узнали, как преобразовывать синхронные и асинхронные методы в наблюдаемые объекты RxJava2.

Конечно, приведенные здесь примеры являются базовыми реализациями. Но мы можем использовать RxJava2 для более сложных приложений, таких как потоковое видео и приложения, в которых нам нужно отправлять большие объемы данных порциями.