«1. Обзор

В этом руководстве мы узнаем, как использовать реактивный HTTP-клиент от Jetty. Мы продемонстрируем его использование с различными библиотеками Reactive, создав небольшие тестовые примеры.

2. Что такое реактивный HttpClient?

HttpClient Jetty позволяет нам блокировать HTTP-запросы. Однако когда мы имеем дело с реактивным API, мы не можем использовать стандартный HTTP-клиент. Чтобы восполнить этот пробел, Jetty создала оболочку для API HttpClient, чтобы она также поддерживала API ReactiveStreams.

Реактивный HttpClient используется либо для потребления, либо для создания потока данных через вызовы HTTP.

В примере, который мы собираемся продемонстрировать, будет клиент Reactive HTTP, который будет взаимодействовать с сервером Jetty, используя различные библиотеки Reactive. Мы также поговорим о событиях запроса и ответа, предоставляемых Reactive HttpClient.

Мы рекомендуем прочитать наши статьи о Project Reactor, RxJava и Spring WebFlux, чтобы лучше понять концепции реактивного программирования и его терминологию.

3. Зависимости Maven

Давайте начнем пример с добавления зависимостей для Reactive Streams, Project Reactor, RxJava, Spring WebFlux и Jetty’s Reactive HTTPClient в наш pom.xml. Наряду с этим мы также добавим зависимость Jetty Server для создания сервера:

<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-reactive-httpclient</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-server</artifactId>
    <version>9.4.19.v20190610</version>
</dependency>
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.12.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.11</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webflux</artifactId>
    <version>5.1.9.RELEASE</version>
</dependency>

4. Создание сервера и клиента

Теперь давайте создадим сервер и добавим обработчик запросов, который просто записывает тело запроса в ответ:

public class RequestHandler extends AbstractHandler {
    @Override
    public void handle(String target, Request jettyRequest, HttpServletRequest request,
      HttpServletResponse response) throws IOException, ServletException {
        jettyRequest.setHandled(true);
        response.setContentType(request.getContentType());
        IO.copy(request.getInputStream(), response.getOutputStream());
    }
}

...

Server server = new Server(8080);
server.setHandler(new RequestHandler());
server.start();

И затем мы можем написать HttpClient:

HttpClient httpClient = new HttpClient();
httpClient.start();

Теперь, когда мы создали клиент и сервер, давайте посмотрим, как мы можем преобразовать этот блокирующий HTTP-клиент в неблокирующий и создайте запрос:

Request request = httpClient.newRequest("http://localhost:8080/"); 
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request).build();
Publisher<ReactiveResponse> publisher = reactiveRequest.response();

Итак, здесь оболочка ReactiveRequest, предоставленная Jetty, сделала наш блокирующий HTTP-клиент реактивным. Давайте продолжим и посмотрим на его использование с различными реактивными библиотеками.

5. Использование ReactiveStreams

HttpClient Jetty изначально поддерживает Reactive Streams, так что давайте начнем с этого.

Теперь Reactive Streams — это просто набор интерфейсов, поэтому для нашего тестирования давайте реализуем простого блокирующего подписчика:

public class BlockingSubscriber implements Subscriber<ReactiveResponse> {
    BlockingQueue<ReactiveResponse> sink = new LinkedBlockingQueue<>(1);

    @Override
    public void onSubscribe(Subscription subscription) { 
        subscription.request(1); 
    }
  
    @Override 
    public void onNext(ReactiveResponse response) { 
        sink.offer(response);
    } 
   
    @Override 
    public void onError(Throwable failure) { } 

    @Override 
    public void onComplete() { }

    public ReactiveResponse block() throws InterruptedException {
        return sink.poll(5, TimeUnit.SECONDS);
    }   
}

Обратите внимание, что нам нужно было вызвать Subscription#request согласно JavaDoc, в котором говорится, что â €œИздатель не будет отправлять события до тех пор, пока с помощью этого метода не будет сообщено о запросе.

Также обратите внимание, что мы добавили механизм безопасности, чтобы наш тест мог выйти из строя, если он не увидел значение в 5 секунды.

Теперь мы можем быстро протестировать наш HTTP-запрос:

BlockingSubscriber subscriber = new BlockingSubscriber();
publisher.subscribe(subscriber);
ReactiveResponse response = subscriber.block();
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6. Использование Project Reactor

Давайте теперь посмотрим, как мы можем использовать Reactive HttpClient с Project Reactor. Создание издателя практически такое же, как и в предыдущем разделе.

После создания издателя воспользуемся классом Mono из Project Reactor, чтобы получить реактивный ответ:

ReactiveResponse response = Mono.from(publisher).block();

Затем мы можем протестировать полученный ответ:

Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6.1. Использование Spring WebFlux

Преобразование блокирующего HTTP-клиента в реактивный выполняется легко при использовании Spring WebFlux. Spring WebFlux поставляется с реактивным клиентом WebClient, который можно использовать с различными библиотеками HTTP-клиентов. Мы можем использовать это как альтернативу использованию прямого кода Project Reactor.

Итак, сначала давайте обернем HTTP-клиент Jetty с помощью JettyClientHttpConnector, чтобы связать его с WebClient:

ClientHttpConnector clientConnector = new JettyClientHttpConnector(httpClient);

А затем передаем этот коннектор WebClient для выполнения неблокирующих HTTP-запросов:

WebClient client = WebClient.builder().clientConnector(clientConnector).build();

Затем давайте выполним фактический HTTP-вызов с только что созданным реактивным HTTP-клиентом и проверим результат:

String responseContent = client.post()
  .uri("http://localhost:8080/").contentType(MediaType.TEXT_PLAIN)
  .body(BodyInserters.fromPublisher(Mono.just("Hello World!"), String.class))
  .retrieve()
  .bodyToMono(String.class)
  .block();
Assert.assertNotNull(responseContent);
Assert.assertEquals("Hello World!", responseContent);

7. Использование RxJava2

Давайте теперь продолжим и посмотрим, как реактивный HTTP-клиент используется с RxJava2. .

Пока мы здесь, давайте немного изменим наш пример, чтобы теперь он включал тело запроса:

ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
  .content(ReactiveRequest.Content
    .fromString("Hello World!", "text/plain", StandardCharsets.UTF_8))
  .build();
Publisher<String> publisher = reactiveRequest
  .response(ReactiveResponse.Content.asString());

Код ReactiveResponse.Content.asString() преобразует тело ответа в строку. Также можно отклонить ответ с помощью метода ReactiveResponse.Content.discard(), если нас интересует только статус запроса.

«Теперь мы видим, что получение ответа с помощью RxJava2 на самом деле очень похоже на Project Reactor. По сути, мы просто используем Single вместо Mono:

String responseContent = Single.fromPublisher(publisher)
  .blockingGet();

Assert.assertEquals("Hello World!", responseContent);

8. События запроса и ответа

Реактивный HTTP-клиент генерирует ряд событий во время выполнения. Они классифицируются как события запроса и события ответа. Эти события помогают заглянуть в жизненный цикл реактивного HTTP-клиента.

На этот раз давайте немного по-другому сделаем наш реактивный запрос, используя HTTP-клиент вместо запроса:

ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localhost:8080/")
  .content(ReactiveRequest.Content.fromString("Hello World!", "text/plain", UTF_8))
  .build();

А теперь давайте получим Publisher событий HTTP-запроса:

Publisher<ReactiveRequest.Event> requestEvents = request.requestEvents();

Теперь давайте воспользуемся RxJava еще раз. На этот раз мы создадим список, содержащий типы событий, и заполним его, подписавшись на события запроса по мере их возникновения:

List<Type> requestEventTypes = new ArrayList<>();

Flowable.fromPublisher(requestEvents)
  .map(ReactiveRequest.Event::getType).subscribe(requestEventTypes::add);
Single<ReactiveResponse> response = Single.fromPublisher(request.response());

Затем, поскольку мы находимся в тесте, мы можем заблокировать наш ответ. и проверяем:

int actualStatus = response.blockingGet().getStatus();

Assert.assertEquals(6, requestEventTypes.size());
Assert.assertEquals(HttpStatus.OK_200, actualStatus);

Точно так же мы можем подписаться и на события ответа. Поскольку они аналогичны подписке на событие запроса, мы добавили сюда только последнюю. Полную реализацию с событиями запроса и ответа можно найти в репозитории GitHub, ссылка на который приведена в конце этой статьи.

9. Заключение

В этом руководстве мы узнали о ReactiveStreams HttpClient, предоставляемом Jetty, его использовании с различными библиотеками Reactive и событиях жизненного цикла, связанных с Reactive запросом.

Все фрагменты кода, упомянутые в статье, можно найти в нашем репозитории GitHub.