«1. Обзор
В этом руководстве мы узнаем, как преобразовать традиционные синхронные и асинхронные API в Observables с помощью операторов RxJava2.
Мы создадим несколько простых функций, которые помогут нам подробно обсудить эти операторы.
2. Зависимости Maven
Во-первых, мы должны добавить RxJava2 и RxJava2Extensions в качестве зависимостей Maven:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>com.github.akarnokd</groupId>
<artifactId>rxjava2-extensions</artifactId>
<version>0.20.4</version>
</dependency>
3. Операторы
RxJava2 определяет множество операторов для различных вариантов использования реактивного программирования .
Но мы обсудим только несколько операторов, которые обычно используются для преобразования синхронных или асинхронных методов в Observable в зависимости от их природы. Эти операторы принимают функции в качестве аргументов и выдают значение, возвращаемое этой функцией.
Наряду с обычными операторами, RxJava2 определяет еще несколько операторов для расширенных функций.
Давайте рассмотрим, как мы можем использовать эти операторы для преобразования синхронных и асинхронных методов.
4. Синхронное преобразование методов
4.1. Использование fromCallable()
Этот оператор возвращает Observable, который, когда подписчик подписывается на него, вызывает функцию, переданную в качестве аргумента, а затем выдает значение, возвращенное этой функцией. Давайте создадим функцию, которая возвращает целое число, и преобразуем ее:
AtomicInteger counter = new AtomicInteger();
Callable<Integer> callable = () -> counter.incrementAndGet();
Теперь давайте превратим ее в Observable и протестируем, подписавшись на нее:
Observable<Integer> source = Observable.fromCallable(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
Оператор fromCallable() лениво выполняет указанную функцию каждый раз, когда обернутый Observable подписывается. Чтобы проверить это поведение, мы создали несколько подписчиков с помощью цикла.
Так как реактивные потоки по умолчанию асинхронны, подписчик вернется немедленно. В большинстве практических сценариев вызываемая функция будет иметь некоторую задержку для завершения своего выполнения. Итак, мы добавили максимальное время ожидания в пять секунд перед проверкой результата нашей вызываемой функции.
Также обратите внимание, что мы использовали метод test() Observable. Этот метод удобен при тестировании Observables. Он создает TestObserver и подписывается на наш Observable.
4.2. Использование start()
Оператор start() является частью модуля RxJava2Extension. Он вызовет указанную функцию асинхронно и вернет Observable, который выдает результат:
Observable<Integer> source = AsyncObservable.start(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
Функция вызывается немедленно, а не всякий раз, когда подписчик подписывается на результирующий Observable. Несколько подписок на этот наблюдаемый наблюдают одно и то же возвращаемое значение.
5. Преобразование асинхронных методов
5.1. Использование fromFuture()
Как мы знаем, наиболее распространенным способом создания асинхронного метода в Java является использование реализации Future. Метод fromFuture принимает Future в качестве аргумента и выдает значение, полученное из метода Future.get().
Во-первых, давайте сделаем функцию, которую мы создали ранее, асинхронной:
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(callable);
Далее, давайте проверим ее преобразованием:
Observable<Integer> source = Observable.fromFuture(future);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
executor.shutdown();
И еще раз заметим, что каждая подписка наблюдает один и тот же результат ценность.
Теперь метод dispose() в Observable действительно полезен, когда речь идет о предотвращении утечки памяти. Но в этом случае это не отменит future из-за блокирующего характера Future.get().
Итак, мы можем гарантировать отмену будущего, объединив функцию doOnDispose() исходного наблюдаемого объекта и метод отмены для будущего:
source.doOnDispose(() -> future.cancel(true));
5.2. Использование startFuture()
Как следует из названия, этот оператор немедленно запустит указанное Future и выдаст возвращаемое значение, когда подписчик подпишется на него. В отличие от оператора fromFuture, который кэширует результат для следующего использования, этот оператор будет выполнять асинхронный метод каждый раз, когда на него подписывается:
ExecutorService executor = Executors.newSingleThreadExecutor();
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
executor.shutdown();
5.3. Использование deferFuture()
Этот оператор объединяет несколько Observable, возвращаемых методом Future, и возвращает поток возвращаемых значений, полученных от каждого Observable. Это запустит переданную асинхронную фабричную функцию всякий раз, когда подписывается новый подписчик.
Итак, давайте сначала создадим асинхронную фабричную функцию:
List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(),
counter.incrementAndGet(), counter.incrementAndGet() });
ExecutorService exec = Executors.newSingleThreadExecutor();
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);
А затем мы можем провести быстрый тест:
Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
for (int i = 1; i < 4; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1,2,3);
}
exec.shutdown();
«
«6. Заключение
В этом уроке мы узнали, как преобразовывать синхронные и асинхронные методы в наблюдаемые объекты RxJava2.
Конечно, приведенные здесь примеры являются базовыми реализациями. Но мы можем использовать RxJava2 для более сложных приложений, таких как потоковое видео и приложения, в которых нам нужно отправлять большие объемы данных порциями.