«1. Обзор

Часто нашим веб-службам необходимо использовать другие веб-службы для выполнения своей работы. Может быть сложно обслуживать запросы пользователей, сохраняя при этом малое время отклика. Медленная внешняя служба может увеличить наше время отклика и привести к тому, что наша система будет накапливать запросы, используя больше ресурсов. Вот где неблокирующий подход может быть очень полезен

В этом руководстве мы будем запускать несколько асинхронных запросов к службе из приложения Play Framework. Используя неблокирующие HTTP-возможности Java, мы сможем беспрепятственно запрашивать внешние ресурсы, не затрагивая нашу собственную основную логику.

В нашем примере мы рассмотрим библиотеку Play WebService.

2. Библиотека Play WebService (WS)

WS — мощная библиотека, обеспечивающая асинхронные HTTP-вызовы с использованием Java Action.

Используя эту библиотеку, наш код отправляет эти запросы и продолжает работу без блокировки. Для обработки результата запроса мы предоставляем потребляющую функцию, то есть реализацию интерфейса Consumer.

Этот шаблон имеет некоторое сходство с реализацией обратных вызовов, промисов и шаблоном async/await в JavaScript.

Давайте создадим простой Потребитель, который регистрирует некоторые данные ответа:

ws.url(url)
  .thenAccept(r -> 
    log.debug("Thread#" + Thread.currentThread().getId() 
      + " Request complete: Response code = " + r.getStatus() 
      + " | Response: " + r.getBody() 
      + " | Current Time:" + System.currentTimeMillis()))

В этом примере наш Потребитель просто ведет журнал. Однако потребитель может делать с результатом все, что нам нужно, например сохранять результат в базе данных.

Если мы углубимся в реализацию библиотеки, то увидим, что WS обертывает и настраивает Java AsyncHttpClient, который является частью стандартного JDK и не зависит от Play.

3. Подготовьте пример проекта

Чтобы поэкспериментировать с фреймворком, давайте создадим несколько модульных тестов для запуска запросов. Мы создадим каркас веб-приложения, чтобы отвечать на них, и будем использовать инфраструктуру WS для выполнения HTTP-запросов.

3.1. Скелет веб-приложения

Прежде всего, мы создаем исходный проект с помощью команды sbt new:

sbt new playframework/play-java-seed.g8

Затем в новой папке мы редактируем файл build.sbt и добавляем зависимость библиотеки WS: ~ ~~

libraryDependencies += javaWs

Теперь мы можем запустить сервер с помощью команды sbt run:

$ sbt run
...
--- (Running the application, auto-reloading is enabled) ---

[info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000

После запуска приложения мы можем проверить, все ли в порядке, просмотрев http://localhost:9000, после чего откроется приветствие Play. страница.

3.2. Среда тестирования

Для тестирования нашего приложения мы будем использовать класс модульного тестирования HomeControllerTest.

Во-первых, нам нужно расширить WithServer, который обеспечит жизненный цикл сервера:

public class HomeControllerTest extends WithServer {

Благодаря своему родителю этот класс теперь запускает наш каркас веб-сервера в тестовом режиме и на случайном порту перед запуском тестов. Класс WithServer также останавливает приложение после завершения теста.

Далее нам нужно предоставить приложение для запуска.

Мы можем создать его с помощью GuiceApplicationBuilder от Guice:

@Override
protected Application provideApplication() {
    return new GuiceApplicationBuilder().build();
}

И, наконец, мы настраиваем URL-адрес сервера для использования в наших тестах, используя номер порта, предоставленный тестовым сервером:

@Override
@Before
public void setup() {
    OptionalInt optHttpsPort = testServer.getRunningHttpsPort();
    if (optHttpsPort.isPresent()) {
        port = optHttpsPort.getAsInt();
        url = "https://localhost:" + port;
    } else {
        port = testServer.getRunningHttpPort()
          .getAsInt();
        url = "http://localhost:" + port;
    }
}

~~ ~ Теперь мы готовы писать тесты. Комплексная среда тестирования позволяет нам сконцентрироваться на кодировании наших тестовых запросов.

4. Подготовьте WSRequest

Давайте посмотрим, как мы можем запускать базовые типы запросов, такие как GET или POST, и составные запросы на загрузку файла.

4.1. Инициализация объекта WSRequest

Прежде всего, нам нужно получить экземпляр WSClient для настройки и инициализации наших запросов.

В реальном приложении мы можем получить клиент, автоматически сконфигурированный с настройками по умолчанию, путем внедрения зависимостей:

@Autowired
WSClient ws;

Однако в нашем тестовом классе мы используем WSTestClient, доступный из среды Play Test:

WSClient ws = play.test.WSTestClient.newClient(port);

Когда у нас есть клиент, мы можем инициализировать объект WSRequest, вызвав метод url:

ws.url(url)

Метод url делает достаточно, чтобы позволить нам запустить запрос. Однако мы можем настроить его дальше, добавив некоторые пользовательские настройки:

ws.url(url)
  .addHeader("key", "value")
  .addQueryParameter("num", "" + num);

Как мы видим, добавить заголовки и параметры запроса довольно просто.

После того, как мы полностью настроили наш запрос, мы можем вызвать метод, чтобы инициировать его.

4.2. Общий запрос GET

Чтобы инициировать запрос GET, нам просто нужно вызвать метод get для нашего объекта WSRequest:

ws.url(url)
  ...
  .get();

«

«Поскольку это неблокирующий код, он запускает запрос, а затем продолжает выполнение на следующей строке нашей функции.

Объект, возвращаемый get, является экземпляром CompletionStage, который является частью CompletableFuture API.

После завершения HTTP-вызова на этом этапе выполняется всего несколько инструкций. Он заключает ответ в объект WSResponse.

Обычно этот результат передается на следующий этап цепочки выполнения. В этом примере мы не предоставили никакой потребляющей функции, поэтому результат потерян.

По этой причине этот запрос относится к типу «запустил и забыл».

4.3. Отправить форму

Отправка формы не сильно отличается от примера с get.

ws.url(url)
  ...
  .setContentType("application/x-www-form-urlencoded")
  .post("key1=value1&key2=value2");

Чтобы инициировать запрос, мы просто вызываем метод post:

В этом сценарии нам нужно передать тело в качестве параметра. Это может быть простая строка, такая как файл, документ json или xml, BodyWritable или Source.

4.4. Отправка данных составной части/формы

Составная форма требует от нас отправки как полей ввода, так и данных из прикрепленного файла или потока.

Чтобы реализовать это во фреймворке, мы используем метод post с источником.

Source<ByteString, ?> file = FileIO.fromPath(Paths.get("hello.txt"));
FilePart<Source<ByteString, ?>> file = 
  new FilePart<>("fileParam", "myfile.txt", "text/plain", file);
DataPart data = new DataPart("key", "value");

ws.url(url)
...
  .post(Source.from(Arrays.asList(file, data)));

Внутри источника мы можем обернуть все различные типы данных, необходимые для нашей формы:

Хотя этот подход добавляет некоторые дополнительные настройки, он все еще очень похож на другие типы запросов.

5. Обработка асинхронного ответа

До этого момента мы запускали только запросы типа «выстрелил-забыл», когда наш код ничего не делал с данными ответа.

Давайте теперь рассмотрим два метода обработки асинхронного ответа.

Мы можем либо заблокировать основной поток, ожидая CompletableFuture, либо потреблять асинхронно с Consumer.

5.1. Обработка ответа путем блокировки с помощью CompletableFuture

Даже при использовании асинхронной среды мы можем заблокировать выполнение нашего кода и дождаться ответа.

WSResponse response = ws.url(url)
  .get()
  .toCompletableFuture()
  .get();

Используя API CompletableFuture, нам нужно всего лишь несколько изменений в нашем коде для реализации этого сценария:

Это может быть полезно, например, для обеспечения строгой согласованности данных, которую мы не можем достичь другими способами.

5.2. Асинхронная обработка ответа

Чтобы обработать асинхронный ответ без блокировки, мы предоставляем потребителя или функцию, которая запускается асинхронной структурой, когда ответ доступен.

ws.url(url)
  .addHeader("key", "value")
  .addQueryParameter("num", "" + 1)
  .get()
  .thenAccept(r -> 
    log.debug("Thread#" + Thread.currentThread().getId() 
      + " Request complete: Response code = " + r.getStatus() 
      + " | Response: " + r.getBody() 
      + " | Current Time:" + System.currentTimeMillis()));

Например, давайте добавим Consumer в наш предыдущий пример для регистрации ответа:

[debug] c.HomeControllerTest - Thread#30 Request complete: Response code = 200 | Response: {
  "Result" : "ok",
  "Params" : {
    "num" : [ "1" ]
  },
  "Headers" : {
    "accept" : [ "*/*" ],
    "host" : [ "localhost:19001" ],
    "key" : [ "value" ],
    "user-agent" : [ "AHC/2.1" ]
  }
} | Current Time:1579303109613

Затем мы увидим ответ в журналах:

Стоит отметить, что мы использовали thenAccept, который требует функция Consumer, так как нам не нужно ничего возвращать после регистрации.

Когда мы хотим, чтобы текущая стадия что-то вернула, чтобы мы могли использовать это на следующей стадии, вместо этого нам нужно использовать thenApply, который принимает функцию.

Они используют соглашения стандартных функциональных интерфейсов Java.

5.3. Большое тело ответа

Код, который мы реализовали до сих пор, является хорошим решением для небольших ответов и большинства вариантов использования. Однако если нам нужно обработать несколько сотен мегабайт данных, нам понадобится лучшая стратегия.

Следует отметить: методы запроса, такие как get и post, загружают в память весь ответ.

Чтобы избежать возможной ошибки OutOfMemoryError, мы можем использовать Akka Streams для обработки ответа, не позволяя ему заполнить нашу память.

ws.url(url)
  .stream()
  .thenAccept(
    response -> {
        try {
            OutputStream outputStream = Files.newOutputStream(path);
            Sink<ByteString, CompletionStage<Done>> outputWriter =
              Sink.foreach(bytes -> outputStream.write(bytes.toArray()));
            response.getBodyAsSource().runWith(outputWriter, materializer);
        } catch (IOException e) {
            log.error("An error happened while opening the output stream", e);
        }
    });

Например, мы можем записать его тело в файл:

Метод потока возвращает CompletionStage, где WSResponse имеет метод getBodyAsStream, предоставляющий Source\u003cByteString, ?\u003e.

Мы можем указать коду, как обрабатывать этот тип тела, используя Akka’s Sink, который в нашем примере будет просто записывать любые данные, проходящие через OutputStream.

5.4. Тайм-ауты

При создании запроса мы также можем установить определенный тайм-аут, поэтому запрос прерывается, если мы не получаем полный ответ вовремя.

Это особенно полезная функция, когда мы видим, что служба, к которой мы обращаемся, работает особенно медленно и может привести к скоплению открытых соединений, зависающих в ожидании ответа.

ws.url(url)
  .setRequestTimeout(Duration.of(1, SECONDS));

«Мы можем установить глобальный тайм-аут для всех наших запросов, используя параметры настройки. Для тайм-аута для конкретного запроса мы можем добавить к запросу, используя setRequestTimeout:

Есть еще один случай, который нужно обработать: мы, возможно, получили все данные, но наш потребитель может очень медленно их обрабатывать. Это может произойти при большом количестве обработки данных, вызовов базы данных и т. д.

В системах с низкой пропускной способностью мы можем просто позволить коду выполняться до его завершения. Однако мы можем захотеть прервать длительные действия.

Чтобы достичь этого, мы должны обернуть наш код некоторой обработкой фьючерсов.

ws.url(url)
  .get()
  .thenApply(
    result -> { 
        try { 
            Thread.sleep(10000L); 
            return Results.ok(); 
        } catch (InterruptedException e) { 
            return Results.status(SERVICE_UNAVAILABLE); 
        } 
    });

Давайте смоделируем очень долгий процесс в нашем коде:

Это вернет ответ OK через 10 секунд, но мы не хотим ждать так долго.

CompletionStage<Result> f = futures.timeout(
  ws.url(url)
    .get()
    .thenApply(result -> {
        try {
            Thread.sleep(10000L);
            return Results.ok();
        } catch (InterruptedException e) {
            return Results.status(SERVICE_UNAVAILABLE);
        }
    }), 1L, TimeUnit.SECONDS);

Вместо этого с помощью обёртки тайм-аута мы указываем нашему коду ждать не более 1 секунды:

Теперь наше будущее вернёт результат в любом случае: результат вычисления, если Потребитель завершится вовремя, или исключение из-за тайм-аута фьючерсов.

5.5. Обработка исключений

В предыдущем примере мы создали функцию, которая либо возвращает результат, либо завершается ошибкой с исключением. Итак, теперь нам нужно обработать оба сценария.

Мы можем обрабатывать как успешные, так и неудачные сценарии с помощью метода handleAsync.

CompletionStage<Object> res = f.handleAsync((result, e) -> {
    if (e != null) {
        log.error("Exception thrown", e);
        return e.getCause();
    } else {
        return result;
    }
});

Допустим, мы хотим вернуть результат, если он у нас есть, или зарегистрировать ошибку и вернуть исключение для дальнейшей обработки:

Теперь код должен вернуть CompletionStage, содержащий сгенерированное TimeoutException.

Class<?> clazz = res.toCompletableFuture().get().getClass();
assertEquals(TimeoutException.class, clazz);

Мы можем проверить это, просто вызвав assertEquals для класса возвращенного объекта исключения:

[error] c.HomeControllerTest - Exception thrown
java.util.concurrent.TimeoutException: Timeout after 1 second
...

При запуске теста он также зарегистрирует полученное исключение:

6. Запрос Фильтры

Иногда нам нужно запустить некоторую логику перед запуском запроса.

Мы могли бы манипулировать объектом WSRequest после его инициализации, но более элегантным способом является установка WSRequestFilter.

Фильтр может быть установлен во время инициализации, перед вызовом метода триггера, и привязан к логике запроса.

Мы можем определить свой собственный фильтр, реализовав интерфейс WSRequestFilter, или можем добавить уже готовый.

Обычный сценарий — протоколирование того, как выглядит запрос, перед его выполнением.

ws.url(url)
  ...
  .setRequestFilter(new AhcCurlRequestLogger())
  ...
  .get();

В этом случае нам просто нужно установить AhcCurlRequestLogger:

[info] p.l.w.a.AhcCurlRequestLogger - curl \
  --verbose \
  --request GET \
  --header 'key: value' \
  'http://localhost:19001'

Полученный журнал имеет формат, подобный curl:

Мы можем установить желаемый уровень журнала, изменив наш журнал. xml конфигурации.

7. Кэширование ответов

WSClient также поддерживает кэширование ответов.

Эта функция особенно полезна, когда один и тот же запрос инициируется несколько раз, и нам не нужны каждый раз самые свежие данные.

Это также помогает, когда служба, которую мы вызываем, временно не работает.

7.1. Добавление зависимостей кэширования

libraryDependencies += ehcache

Чтобы настроить кэширование, нам нужно сначала добавить зависимость в наш build.sbt:

Это настраивает Ehcache в качестве нашего уровня кэширования.

Если нам не нужен именно Ehcache, мы можем использовать любую другую реализацию кэша JSR-107.

7.2. Эвристика принудительного кэширования

По умолчанию Play WS не кэширует HTTP-ответы, если сервер не возвращает какую-либо конфигурацию кэширования.

play.ws.cache.heuristics.enabled=true

Чтобы обойти это, мы можем форсировать эвристическое кэширование, добавив параметр в наш application.conf: рекламируемое кэширование.

8. Дополнительная настройка

Выполнение запросов к внешней службе может потребовать некоторой настройки клиента. Нам может понадобиться обработать перенаправления, медленный сервер или некоторую фильтрацию в зависимости от заголовка пользовательского агента.

Чтобы решить эту проблему, мы можем настроить наш WS-клиент, используя свойства в нашем application.conf:

play.ws.followRedirects=false
play.ws.useragent=MyPlayApplication
play.ws.compressionEnabled=true
# time to wait for the connection to be established
play.ws.timeout.connection=30
# time to wait for data after the connection is open
play.ws.timeout.idle=30
# max time available to complete the request
play.ws.timeout.request=300

Также можно напрямую настроить базовый AsyncHttpClient.

Полный список доступных свойств можно посмотреть в исходном коде AhcConfig.

9. Заключение

«В этой статье мы изучили библиотеку Play WS и ее основные функции. Мы настроили наш проект, научились запускать общие запросы и обрабатывать их ответы как синхронно, так и асинхронно.

Мы работали с загрузками больших объемов данных и увидели, как сократить длительные операции.

Наконец, мы рассмотрели кэширование для повышения производительности и способы настройки клиента.

Как всегда, исходный код этого руководства доступен на GitHub.