«1. Обзор
В этом руководстве мы узнаем, как использовать реактивный HTTP-клиент от Jetty. Мы продемонстрируем его использование с различными библиотеками Reactive, создав небольшие тестовые примеры.
2. Что такое реактивный HttpClient?
HttpClient Jetty позволяет нам блокировать HTTP-запросы. Однако когда мы имеем дело с реактивным API, мы не можем использовать стандартный HTTP-клиент. Чтобы восполнить этот пробел, Jetty создала оболочку для API HttpClient, чтобы она также поддерживала API ReactiveStreams.
Реактивный HttpClient используется либо для потребления, либо для создания потока данных через вызовы HTTP.
В примере, который мы собираемся продемонстрировать, будет клиент Reactive HTTP, который будет взаимодействовать с сервером Jetty, используя различные библиотеки Reactive. Мы также поговорим о событиях запроса и ответа, предоставляемых Reactive HttpClient.
Мы рекомендуем прочитать наши статьи о Project Reactor, RxJava и Spring WebFlux, чтобы лучше понять концепции реактивного программирования и его терминологию.
3. Зависимости Maven
Давайте начнем пример с добавления зависимостей для Reactive Streams, Project Reactor, RxJava, Spring WebFlux и Jetty’s Reactive HTTPClient в наш pom.xml. Наряду с этим мы также добавим зависимость Jetty Server для создания сервера:
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-reactive-httpclient</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.19.v20190610</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.12.RELEASE</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.11</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.1.9.RELEASE</version>
</dependency>
4. Создание сервера и клиента
Теперь давайте создадим сервер и добавим обработчик запросов, который просто записывает тело запроса в ответ:
public class RequestHandler extends AbstractHandler {
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
jettyRequest.setHandled(true);
response.setContentType(request.getContentType());
IO.copy(request.getInputStream(), response.getOutputStream());
}
}
...
Server server = new Server(8080);
server.setHandler(new RequestHandler());
server.start();
И затем мы можем написать HttpClient:
HttpClient httpClient = new HttpClient();
httpClient.start();
Теперь, когда мы создали клиент и сервер, давайте посмотрим, как мы можем преобразовать этот блокирующий HTTP-клиент в неблокирующий и создайте запрос:
Request request = httpClient.newRequest("http://localhost:8080/");
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request).build();
Publisher<ReactiveResponse> publisher = reactiveRequest.response();
Итак, здесь оболочка ReactiveRequest, предоставленная Jetty, сделала наш блокирующий HTTP-клиент реактивным. Давайте продолжим и посмотрим на его использование с различными реактивными библиотеками.
5. Использование ReactiveStreams
HttpClient Jetty изначально поддерживает Reactive Streams, так что давайте начнем с этого.
Теперь Reactive Streams — это просто набор интерфейсов, поэтому для нашего тестирования давайте реализуем простого блокирующего подписчика:
public class BlockingSubscriber implements Subscriber<ReactiveResponse> {
BlockingQueue<ReactiveResponse> sink = new LinkedBlockingQueue<>(1);
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(1);
}
@Override
public void onNext(ReactiveResponse response) {
sink.offer(response);
}
@Override
public void onError(Throwable failure) { }
@Override
public void onComplete() { }
public ReactiveResponse block() throws InterruptedException {
return sink.poll(5, TimeUnit.SECONDS);
}
}
Обратите внимание, что нам нужно было вызвать Subscription#request согласно JavaDoc, в котором говорится, что â €œИздатель не будет отправлять события до тех пор, пока с помощью этого метода не будет сообщено о запросе.
Также обратите внимание, что мы добавили механизм безопасности, чтобы наш тест мог выйти из строя, если он не увидел значение в 5 секунды.
Теперь мы можем быстро протестировать наш HTTP-запрос:
BlockingSubscriber subscriber = new BlockingSubscriber();
publisher.subscribe(subscriber);
ReactiveResponse response = subscriber.block();
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);
6. Использование Project Reactor
Давайте теперь посмотрим, как мы можем использовать Reactive HttpClient с Project Reactor. Создание издателя практически такое же, как и в предыдущем разделе.
После создания издателя воспользуемся классом Mono из Project Reactor, чтобы получить реактивный ответ:
ReactiveResponse response = Mono.from(publisher).block();
Затем мы можем протестировать полученный ответ:
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);
6.1. Использование Spring WebFlux
Преобразование блокирующего HTTP-клиента в реактивный выполняется легко при использовании Spring WebFlux. Spring WebFlux поставляется с реактивным клиентом WebClient, который можно использовать с различными библиотеками HTTP-клиентов. Мы можем использовать это как альтернативу использованию прямого кода Project Reactor.
Итак, сначала давайте обернем HTTP-клиент Jetty с помощью JettyClientHttpConnector, чтобы связать его с WebClient:
ClientHttpConnector clientConnector = new JettyClientHttpConnector(httpClient);
А затем передаем этот коннектор WebClient для выполнения неблокирующих HTTP-запросов:
WebClient client = WebClient.builder().clientConnector(clientConnector).build();
Затем давайте выполним фактический HTTP-вызов с только что созданным реактивным HTTP-клиентом и проверим результат:
String responseContent = client.post()
.uri("http://localhost:8080/").contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromPublisher(Mono.just("Hello World!"), String.class))
.retrieve()
.bodyToMono(String.class)
.block();
Assert.assertNotNull(responseContent);
Assert.assertEquals("Hello World!", responseContent);
7. Использование RxJava2
Давайте теперь продолжим и посмотрим, как реактивный HTTP-клиент используется с RxJava2. .
Пока мы здесь, давайте немного изменим наш пример, чтобы теперь он включал тело запроса:
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
.content(ReactiveRequest.Content
.fromString("Hello World!", "text/plain", StandardCharsets.UTF_8))
.build();
Publisher<String> publisher = reactiveRequest
.response(ReactiveResponse.Content.asString());
Код ReactiveResponse.Content.asString() преобразует тело ответа в строку. Также можно отклонить ответ с помощью метода ReactiveResponse.Content.discard(), если нас интересует только статус запроса.
«Теперь мы видим, что получение ответа с помощью RxJava2 на самом деле очень похоже на Project Reactor. По сути, мы просто используем Single вместо Mono:
String responseContent = Single.fromPublisher(publisher)
.blockingGet();
Assert.assertEquals("Hello World!", responseContent);
8. События запроса и ответа
Реактивный HTTP-клиент генерирует ряд событий во время выполнения. Они классифицируются как события запроса и события ответа. Эти события помогают заглянуть в жизненный цикл реактивного HTTP-клиента.
На этот раз давайте немного по-другому сделаем наш реактивный запрос, используя HTTP-клиент вместо запроса:
ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localhost:8080/")
.content(ReactiveRequest.Content.fromString("Hello World!", "text/plain", UTF_8))
.build();
А теперь давайте получим Publisher событий HTTP-запроса:
Publisher<ReactiveRequest.Event> requestEvents = request.requestEvents();
Теперь давайте воспользуемся RxJava еще раз. На этот раз мы создадим список, содержащий типы событий, и заполним его, подписавшись на события запроса по мере их возникновения:
List<Type> requestEventTypes = new ArrayList<>();
Flowable.fromPublisher(requestEvents)
.map(ReactiveRequest.Event::getType).subscribe(requestEventTypes::add);
Single<ReactiveResponse> response = Single.fromPublisher(request.response());
Затем, поскольку мы находимся в тесте, мы можем заблокировать наш ответ. и проверяем:
int actualStatus = response.blockingGet().getStatus();
Assert.assertEquals(6, requestEventTypes.size());
Assert.assertEquals(HttpStatus.OK_200, actualStatus);
Точно так же мы можем подписаться и на события ответа. Поскольку они аналогичны подписке на событие запроса, мы добавили сюда только последнюю. Полную реализацию с событиями запроса и ответа можно найти в репозитории GitHub, ссылка на который приведена в конце этой статьи.
9. Заключение
В этом руководстве мы узнали о ReactiveStreams HttpClient, предоставляемом Jetty, его использовании с различными библиотеками Reactive и событиях жизненного цикла, связанных с Reactive запросом.
Все фрагменты кода, упомянутые в статье, можно найти в нашем репозитории GitHub.