«1. Обзор

В этой статье мы сосредоточимся на использовании Reactive Extensions (Rx) в Java для создания и использования последовательностей данных.

На первый взгляд API может выглядеть как Java 8 Streams, но на самом деле он гораздо более гибкий и плавный, что делает его мощной парадигмой программирования.

Если вы хотите узнать больше о RxJava, прочтите эту статью.

2. Настройка

Чтобы использовать RxJava в нашем проекте Maven, нам нужно добавить следующую зависимость в наш pom.xml:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>${rx.java.version}</version>
</dependency>

Или, для проекта Gradle:

compile 'io.reactivex.rxjava:rxjava:x.y.z'

~~ ~ 3. Функциональные реактивные концепции

С одной стороны, функциональное программирование — это процесс создания программного обеспечения путем составления чистых функций, избегая общего состояния, изменяемых данных и побочных эффектов.

С другой стороны, реактивное программирование — это парадигма асинхронного программирования, связанная с потоками данных и распространением изменений.

Вместе функциональное реактивное программирование образует комбинацию функциональных и реактивных методов, которые могут представлять собой элегантный подход к программированию, управляемому событиями, со значениями, которые меняются со временем, и где потребитель реагирует на данные по мере их поступления. ~ ~~ Эта технология объединяет различные реализации своих основных принципов, некоторые авторы придумали документ, определяющий общий словарь для описания приложений нового типа.

3.1. Reactive Manifesto

Reactive Manifesto — это онлайн-документ, в котором излагаются высокие стандарты для приложений в индустрии разработки программного обеспечения. Проще говоря, реактивные системы:

Отзывчивые — системы должны реагировать своевременно Управляемые сообщениями — системы должны использовать асинхронную передачу сообщений между компонентами для обеспечения слабой связи Эластичные — системы должны оставаться отзывчивыми при высокой нагрузке Отказоустойчивость — системы должны оставаться отзывчивыми при сбое некоторых компонентов

    4. Наблюдаемые объекты

При работе с Rx необходимо понимать два ключевых типа:

Наблюдаемый представляет собой любой объект, который может получать данные из источника данных и чьи состояние может представлять интерес таким образом, что другие объекты могут зарегистрировать интерес. Наблюдатель — это любой объект, который желает получать уведомления об изменении состояния другого объекта

    Наблюдатель подписывается на последовательность Observable. Последовательность отправляет элементы наблюдателю по одному.

Наблюдатель обрабатывает каждый перед обработкой следующего. Если многие события приходят асинхронно, они должны быть сохранены в очереди или удалены.

В Rx наблюдатель никогда не будет вызываться с элементом не по порядку или вызываться до того, как обратный вызов вернется для предыдущего элемента.

4.1. Типы Observable

Существует два типа:

Неблокирующий — поддерживается асинхронное выполнение и разрешена отмена подписки в любой точке потока событий. В этой статье мы сосредоточимся в основном на этом типе

    Блокировка — все вызовы наблюдателя onNext будут синхронными, и невозможно отказаться от подписки в середине потока событий. Мы всегда можем преобразовать Observable в Blocking Observable, используя метод toBlocking:
    4.2. Операторы
BlockingObservable<String> blockingObservable = observable.toBlocking();

Оператор — это функция, которая принимает один Observable (источник) в качестве первого аргумента и возвращает другой Observable (пункт назначения). Затем для каждого элемента, который испускает наблюдаемый источник, он применяет функцию к этому элементу, а затем выдает результат в наблюдаемом приемнике.

Операторы могут быть объединены в цепочку для создания сложных потоков данных, фильтрующих события на основе определенных критериев. К одному и тому же наблюдаемому можно применять несколько операторов.

Нетрудно попасть в ситуацию, в которой Observable генерирует элементы быстрее, чем оператор или наблюдатель может их использовать. Подробнее об обратном давлении можно прочитать здесь.

4.3. Create Observable

Базовый оператор просто создает Observable, который перед завершением выдает один общий экземпляр, String «Hello». Когда мы хотим получить информацию из Observable, мы реализуем интерфейс наблюдателя, а затем вызываем подписку на нужный Observable:

«

Observable<String> observable = Observable.just("Hello");
observable.subscribe(s -> result = s);
 
assertTrue(result.equals("Hello"));

«4.4. OnNext, OnError и OnCompleted

В интерфейсе наблюдателя есть три метода, о которых мы хотим знать:

  1. OnNext is called on our observer each time a new event is published to the attached Observable. This is the method where we’ll perform some action on each event
  2. OnCompleted is called when the sequence of events associated with an Observable is complete, indicating that we should not expect any more onNext calls on our observer
  3. OnError is called when an unhandled exception is thrown during the RxJava framework code or our event handling code

Возвращаемое значение для метода subscribe Observables — это интерфейс подписки:

String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters);
observable.subscribe(
  i -> result += i,  //OnNext
  Throwable::printStackTrace, //OnError
  () -> result += "_Completed" //OnCompleted
);
assertTrue(result.equals("abcdefg_Completed"));

5. Преобразования Observable и условные операторы

5.1. Map

Оператор карты преобразует элементы, испускаемые Observable, применяя функцию к каждому элементу.

Предположим, что есть объявленный массив строк, который содержит некоторые буквы алфавита, и мы хотим напечатать их заглавными буквами:

Observable.from(letters)
  .map(String::toUpperCase)
  .subscribe(letter -> result += letter);
assertTrue(result.equals("ABCDEFG"));

FlatMap можно использовать для выравнивания Observables всякий раз, когда мы получаем вложенные Observables .

Более подробную информацию о разнице между картой и плоской картой можно найти здесь.

Предположим, у нас есть метод, который возвращает Observable\u003cString\u003e из списка строк. Теперь мы будем печатать для каждой строки из нового Observable список заголовков на основе того, что видит подписчик:

Observable<String> getTitle() {
    return Observable.from(titleList);
}
Observable.just("book1", "book2")
  .flatMap(s -> getTitle())
  .subscribe(l -> result += l);

assertTrue(result.equals("titletitle"));

5.2. Scan

Оператор сканирования последовательно применяет функцию к каждому элементу, выдаваемому Observable, и выдает каждое последующее значение.

Это позволяет нам переносить состояние от события к событию:

String[] letters = {"a", "b", "c"};
Observable.from(letters)
  .scan(new StringBuilder(), StringBuilder::append)
  .subscribe(total -> result += total.toString());
assertTrue(result.equals("aababc"));

5.3. GroupBy

Оператор Group by позволяет нам классифицировать события во входном Observable по выходным категориям.

Предположим, что мы создали массив целых чисел от 0 до 10, затем применим группировку, которая разделит их на категории четные и нечетные:

Observable.from(numbers)
  .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
  .subscribe(group ->
    group.subscribe((number) -> {
        if (group.getKey().toString().equals("EVEN")) {
            EVEN[0] += number;
        } else {
            ODD[0] += number;
        }
    })
  );
assertTrue(EVEN[0].equals("0246810"));
assertTrue(ODD[0].equals("13579"));

5.4. Filter

Оператор filter выделяет из наблюдаемого только те элементы, которые проходят тест предиката.

Итак, давайте отфильтруем нечетные числа в целочисленном массиве:

Observable.from(numbers)
  .filter(i -> (i % 2 == 1))
  .subscribe(i -> result += i);
 
assertTrue(result.equals("13579"));

5.5. Условные операторы

DefaultIfEmpty выдает элемент из исходного Observable или элемент по умолчанию, если исходный Observable пуст:

Observable.empty()
  .defaultIfEmpty("Observable is empty")
  .subscribe(s -> result += s);
 
assertTrue(result.equals("Observable is empty"));

Следующий код выдает первую букву алфавита «a», потому что буквы массива не пусто, и это то, что он содержит в первой позиции:

Observable.from(letters)
  .defaultIfEmpty("Observable is empty")
  .first()
  .subscribe(s -> result += s);
 
assertTrue(result.equals("a"));

Оператор TakeWhile отбрасывает элементы, испускаемые Observable после того, как указанное условие становится ложным:

Observable.from(numbers)
  .takeWhile(i -> i < 5)
  .subscribe(s -> sum[0] += s);
 
assertTrue(sum[0] == 10);

Конечно, есть и другие операторы, которые могли бы охватывать наши потребности, такие как Contain, SkipWhile, SkipUntil, TakeUntil и т. д.

6. Connectable Observable

ConnectableObservable похож на обычный Observable, за исключением того, что он не начинает испускать элементы, когда он подписан, а только когда соединение к нему применяется оператор.

Таким образом, мы можем дождаться, пока все предполагаемые наблюдатели подпишутся на Observable, прежде чем Observable начнет генерировать элементы: значений, выдает одно значение или уведомление об ошибке.

String[] result = {""};
ConnectableObservable<Long> connectable
  = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
connectable.subscribe(i -> result[0] += i);
assertFalse(result[0].equals("01"));

connectable.connect();
Thread.sleep(500);
 
assertTrue(result[0].equals("01"));

С этим источником данных мы можем использовать только два метода для подписки:

OnSuccess возвращает Single, который также вызывает указанный нами метод. OnError также возвращает Single, который немедленно уведомляет подписчиков об ошибке

8. Субъекты

    Субъект является одновременно двумя элементами, подписчиком и наблюдаемым. В качестве подписчика субъект может использоваться для публикации событий, происходящих из более чем одного наблюдаемого объекта.
String[] result = {""};
Single<String> single = Observable.just("Hello")
  .toSingle()
  .doOnSuccess(i -> result[0] += i)
  .doOnError(error -> {
      throw new RuntimeException(error.getMessage());
  });
single.subscribe();
 
assertTrue(result[0].equals("Hello"));

И поскольку это также можно наблюдать, события от нескольких подписчиков могут быть повторно отправлены как его события любому, кто его наблюдает.

В следующем примере мы рассмотрим, как наблюдатели смогут видеть события, происходящие после их подписки:

9. Управление ресурсами

Использование операции позволяет нам связывать ресурсы, например, соединение с базой данных JDBC, сетевое соединение или открытые файлы для наших наблюдаемых объектов.

Integer subscriber1 = 0;
Integer subscriber2 = 0;
Observer<Integer> getFirstObserver() {
    return new Observer<Integer>() {
        @Override
        public void onNext(Integer value) {
           subscriber1 += value;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("error");
        }

        @Override
        public void onCompleted() {
            System.out.println("Subscriber1 completed");
        }
    };
}

Observer<Integer> getSecondObserver() {
    return new Observer<Integer>() {
        @Override
        public void onNext(Integer value) {
            subscriber2 += value;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("error");
        }

        @Override
        public void onCompleted() {
            System.out.println("Subscriber2 completed");
        }
    };
}

PublishSubject<Integer> subject = PublishSubject.create(); 
subject.subscribe(getFirstObserver()); 
subject.onNext(1); 
subject.onNext(2); 
subject.onNext(3); 
subject.subscribe(getSecondObserver()); 
subject.onNext(4); 
subject.onCompleted();
 
assertTrue(subscriber1 + subscriber2 == 14)

Здесь мы представляем в комментариях шаги, которые нам нужно сделать для достижения этой цели, а также пример реализации:

10. Заключение

В этой статье мы рассказали, как использовать RxJava. библиотеку, а также как изучить ее наиболее важные функции.

String[] result = {""};
Observable<Character> values = Observable.using(
  () -> "MyResource",
  r -> {
      return Observable.create(o -> {
          for (Character c : r.toCharArray()) {
              o.onNext(c);
          }
          o.onCompleted();
      });
  },
  r -> System.out.println("Disposed: " + r)
);
values.subscribe(
  v -> result[0] += v,
  e -> result[0] += e
);
assertTrue(result[0].equals("MyResource"));

Полный исходный код проекта, включая все используемые здесь примеры кода, можно найти на Github.

«

The full source code for the project including all the code samples used here can be found over on Github.