«1. Введение

RxJava — одна из самых популярных библиотек реактивного программирования.

И Ratpack — это набор библиотек Java для создания компактных и мощных веб-приложений, построенных на Netty.

В этом уроке мы обсудим включение RxJava в приложение Ratpack для создания красивого реактивного веб-приложения.

2. Зависимости Maven

Теперь нам нужны зависимости ratpack-core и ratpack-rx:

<dependency>
    <groupId>io.ratpack</groupId>
    <artifactId>ratpack-core</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>io.ratpack</groupId>
    <artifactId>ratpack-rx</artifactId>
    <version>1.6.0</version>
</dependency>

Обратите внимание, что ratpack-rx импортирует для нас зависимость rxjava.

3. Первоначальная настройка

RxJava поддерживает интеграцию сторонних библиотек, используя свою систему плагинов. Таким образом, мы можем включить различные стратегии выполнения в модель выполнения RxJava.

Ratpack подключается к этой модели выполнения через RxRatpack, который мы инициализируем при запуске:

RxRatpack.initialise();

Теперь важно отметить, что этот метод нужно вызывать только один раз при запуске JVM.

В результате мы сможем отображать Observables RxJava в типы Promise RxRatpack и наоборот.

4. Observable в обещания

Мы можем преобразовать Observable в RxJava в обещание Ratpack.

Однако есть небольшое несоответствие. Видите ли, Promise выдает одно значение, а Observable может выдавать их поток.

RxRatpack справляется с этим, предлагая два разных метода: promiseSingle() и promise().

Итак, давайте предположим, что у нас есть служба с именем MovieService, которая выдает одно обещание в getMovie(). Мы бы использовали promiseSingle(), так как знаем, что он выдаст только один раз:

Handler movieHandler = (ctx) -> {
    MovieService movieSvc = ctx.get(MovieService.class);
    Observable<Movie> movieObs = movieSvc.getMovie();
    RxRatpack.promiseSingle(movieObs)
      .then(movie -> ctx.render(Jackson.json(movie)));
};

С другой стороны, если getMovies() может вернуть поток результатов фильма, мы бы использовали promise():

Handler moviesHandler = (ctx) -> {
    MovieService movieSvc = ctx.get(MovieService.class);
    Observable<Movie> movieObs = movieSvc.getMovies();
    RxRatpack.promise(movieObs)
      .then(movie -> ctx.render(Jackson.json(movie)));
};

Затем мы можем добавить эти обработчики на наш сервер Ratpack, как обычно:

RatpackServer.start(def -> def.registryOf(rSpec -> rSpec.add(MovieService.class, new MovieServiceImpl()))
  .handlers(chain -> chain
    .get("movie", movieHandler)
    .get("movies", moviesHandler)));

5. Промисы для наблюдаемых

И наоборот, мы можем сопоставить тип Promise в Ratpack обратно с наблюдаемым RxJava.

RxRatpack снова имеет два метода:Observ() иObservEach().

В этом случае мы представим, что у нас есть киносервис, который возвращает Promises вместо Observables.

С нашим getMovie() мы бы использовалиObserv():

Handler moviePromiseHandler = ctx -> {
    MoviePromiseService promiseSvc = ctx.get(MoviePromiseService.class);
    Promise<Movie> moviePromise = promiseSvc.getMovie();
    RxRatpack.observe(moviePromise)
      .subscribe(movie -> ctx.render(Jackson.json(movie)));
};

И когда мы получаем список, как и с getMovies(), мы бы использовалиObservEach():

Handler moviesPromiseHandler = ctx -> {
    MoviePromiseService promiseSvc = ctx.get(MoviePromiseService.class);
    Promise<List<Movie>> moviePromises = promiseSvc.getMovies();
    RxRatpack.observeEach(moviePromises)
        .toList()
        .subscribe(movie -> ctx.render(Jackson.json(movie)));
};

~~ ~ Затем снова можно добавить обработчики, как и ожидалось:

RatpackServer.start(def -> def.registryOf(regSpec -> regSpec
  .add(MoviePromiseService.class, new MoviePromiseServiceImpl()))
    .handlers(chain -> chain
      .get("movie", moviePromiseHandler)
      .get("movies", moviesPromiseHandler)));

6. Параллельная обработка

RxRatpack поддерживает параллелизм с помощью методов fork() и forkEach().

И это следует схеме, которую мы уже видели с каждым.

fork() берет один Observable и распараллеливает его выполнение в другом вычислительном потоке. Затем он автоматически привязывает данные обратно к исходному выполнению.

С другой стороны, forkEach() делает то же самое для каждого элемента, испускаемого потоком значений Observable.

Давайте на мгновение представим, что мы хотим использовать заглавные буквы в названиях наших фильмов, и что это дорогостоящая операция.

Проще говоря, мы можем использовать forkEach() для переноса выполнения каждого из них на пул потоков:

Observable<Movie> movieObs = movieSvc.getMovies();
Observable<String> upperCasedNames = movieObs.compose(RxRatpack::forkEach)
  .map(movie -> movie.getName().toUpperCase())
  .serialize();

7. Неявная обработка ошибок

Наконец, неявная обработка ошибок является одной из ключевых функций. в интеграции RxJava.

По умолчанию наблюдаемые последовательности RxJava пересылают любое исключение обработчику исключений контекста выполнения. По этой причине обработчики ошибок не нужно определять в наблюдаемых последовательностях.

Итак, мы можем настроить Ratpack для обработки этих ошибок, вызванных RxJava.

Предположим, например, что мы хотим, чтобы каждая ошибка выводилась в HTTP-ответе.

Обратите внимание, что исключение, которое мы выбрасываем через Observable, перехватывается и обрабатывается нашим ServerErrorHandler:

RatpackServer.start(def -> def.registryOf(regSpec -> regSpec
  .add(ServerErrorHandler.class, (ctx, throwable) -> {
        ctx.render("Error caught by handler : " + throwable.getMessage());
    }))
  .handlers(chain -> chain
    .get("error", ctx -> {
        Observable.<String> error(new Exception("Error from observable")).subscribe(s -> {});
    })));

Обратите внимание, что любая обработка ошибок на уровне подписчика имеет приоритет. Если бы наш Observable хотел сделать свою собственную обработку ошибок, он мог бы это сделать, но поскольку это не так, исключение просачивается до Ratpack.

8. Заключение

В этой статье мы говорили о том, как настроить RxJava с помощью Ratpack.

Мы исследовали преобразование Observables в RxJava в типы Promise в Ratpack и наоборот. Мы также изучили функции параллелизма и неявной обработки ошибок, поддерживаемые интеграцией.

Все образцы кода, использованные в статье, можно найти на Github.