«1. Обзор

Когда мы хотим, чтобы наши веб-клиенты поддерживали диалог с нашим сервером, WebSockets может быть полезным решением. WebSockets поддерживают постоянное полнодуплексное соединение. Это дает нам возможность отправлять двунаправленные сообщения между нашим сервером и клиентом.

В этом уроке мы узнаем, как использовать WebSockets с Akka в Play Framework.

2. Настройка

Давайте настроим простое приложение для чата. Пользователь будет отправлять сообщения на сервер, а сервер ответит сообщением от JSONPlaceholder.

2.1. Настройка приложения Play Framework

Мы создадим это приложение, используя Play Framework.

Давайте следуем инструкциям из раздела Введение в Play на Java, чтобы настроить и запустить простое приложение Play Framework.

2.2. Добавление необходимых файлов JavaScript

Кроме того, нам потребуется работать с JavaScript для сценариев на стороне клиента. Это позволит нам получать новые сообщения, отправленные с сервера. Для этого мы будем использовать библиотеку jQuery.

Добавим jQuery в конец файла app/views/index.scala.html:

<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>

2.3. Настройка Akka

Наконец, мы будем использовать Akka для обработки соединений WebSocket на стороне сервера.

Давайте перейдем к файлу build.sbt и добавим зависимости.

Нам нужно добавить зависимости akka-actor и akka-testkit:

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion

Нам нужно, чтобы они могли использовать и тестировать код Akka Framework.

Далее мы будем использовать потоки Akka. Итак, давайте добавим зависимость akka-stream:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion

Наконец, нам нужно вызвать конечную точку отдыха из актора Akka. Для этого нам понадобится зависимость akka-http. Когда мы это сделаем, конечная точка вернет данные JSON, которые нам придется десериализовать, поэтому нам также нужно добавить зависимость akka-http-jackson:

libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion

И теперь все готово. Давайте посмотрим, как заставить работать WebSockets!

3. Обработка веб-сокетов с помощью акторов Akka

Механизм обработки веб-сокетов в Play построен вокруг потоков Akka. WebSocket моделируется как поток. Таким образом, входящие сообщения WebSocket передаются в поток, а сообщения, созданные потоком, отправляются клиенту.

Для обработки WebSocket с помощью Актера нам понадобится утилита Play ActorFlow, которая преобразует ActorRef в поток. В основном для этого требуется некоторый код Java с небольшой настройкой.

3.1. Метод контроллера WebSocket

Во-первых, нам нужен экземпляр Materializer. Materializer — это фабрика для двигателей потокового исполнения.

Нам нужно внедрить ActorSystem и Materializer в контроллер app/controllers/HomeController.java:

private ActorSystem actorSystem;
private Materializer materializer;

@Inject
public HomeController(
  ActorSystem actorSystem, Materializer materializer) {
    this.actorSystem = actorSystem;
    this.materializer = materializer;
}

Теперь добавим метод контроллера сокета:

public WebSocket socket() {
    return WebSocket.Json
      .acceptOrResult(this::createActorFlow);
}

Здесь мы вызываем функцию acceptOrResult, который принимает заголовок запроса и возвращает будущее. Возвращаемое будущее — это поток для обработки сообщений WebSocket.

Вместо этого мы можем отклонить запрос и вернуть результат отклонения.

Теперь давайте создадим поток:

private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> 
  createActorFlow(Http.RequestHeader request) {
    return CompletableFuture.completedFuture(
      F.Either.Right(createFlowForActor()));
}

Класс F в Play Framework определяет набор помощников в стиле функционального программирования. В этом случае мы используем F.Either.Right, чтобы принять соединение и вернуть поток.

Допустим, мы хотим отклонить соединение, когда клиент не аутентифицирован.

Для этого мы могли бы проверить, установлено ли имя пользователя в сеансе. А если это не так, мы отклоняем соединение с HTTP 403 Forbidden:

private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> 
  createActorFlow2(Http.RequestHeader request) {
    return CompletableFuture.completedFuture(
      request.session()
      .getOptional("username")
      .map(username -> 
        F.Either.<Result, Flow<JsonNode, JsonNode, ?>>Right(
          createFlowForActor()))
      .orElseGet(() -> F.Either.Left(forbidden())));
}

Мы используем F.Either.Left для отклонения соединения так же, как мы предоставляем поток с F.Either.Right.

Наконец, мы связываем поток с актором, который будет обрабатывать сообщения:

private Flow<JsonNode, JsonNode, ?> createFlowForActor() {
    return ActorFlow.actorRef(out -> Messenger.props(out), 
      actorSystem, materializer);
}

ActorFlow.actorRef создает поток, который обрабатывается актором Messenger.

3.2. Файл маршрутов

Теперь давайте добавим определения маршрутов для методов контроллера в conf/routes:

GET  /                    controllers.HomeController.index(request: Request)
GET  /chat                controllers.HomeController.socket
GET  /chat/with/streams   controllers.HomeController.akkaStreamsSocket
GET  /assets/*file        controllers.Assets.versioned(path="/public", file: Asset)

Эти определения маршрутов сопоставляют входящие HTTP-запросы с методами действия контроллера, как описано в разделе Маршрутизация в приложениях Play на Java.

3.3. Реализация актора

Наиболее важной частью класса актора является метод createReceive, который определяет, какие сообщения может обрабатывать актор:

@Override
public Receive createReceive() {
    return receiveBuilder()
      .match(JsonNode.class, this::onSendMessage)
      .matchAny(o -> log.error("Received unknown message: {}", o.getClass()))
      .build();
}

«

private void onSendMessage(JsonNode jsonNode) {
    RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
    String message = requestDTO.getMessage().toLowerCase();
    //..
    processMessage(requestDTO);
}

«Актер будет пересылать все сообщения, соответствующие классу JsonNode, в метод обработчика onSendMessage:

private void processMessage(RequestDTO requestDTO) {
    CompletionStage<HttpResponse> responseFuture = getRandomMessage();
    responseFuture.thenCompose(this::consumeHttpResponse)
      .thenAccept(messageDTO ->
        out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()));
}

Затем обработчик будет отвечать на каждое сообщение, используя метод processMessage:

3.4. Использование Rest API с Akka HTTP

Мы будем отправлять HTTP-запросы фиктивному генератору сообщений в JSONPlaceholder Posts. Когда приходит ответ, мы отправляем ответ клиенту, записывая его.

private CompletionStage<HttpResponse> getRandomMessage() {
    int postId = ThreadLocalRandom.current().nextInt(0, 100);
    return Http.get(getContext().getSystem())
      .singleRequest(HttpRequest.create(
        "https://jsonplaceholder.typicode.com/posts/" + postId));
}

Давайте создадим метод, который вызывает конечную точку со случайным идентификатором сообщения:

private CompletionStage<MessageDTO> consumeHttpResponse(
  HttpResponse httpResponse) {
    Materializer materializer = 
      Materializer.matFromSystem(getContext().getSystem());
    return Jackson.unmarshaller(MessageDTO.class)
      .unmarshal(httpResponse.entity(), materializer)
      .thenApply(messageDTO -> {
          log.info("Received message: {}", messageDTO);
          discardEntity(httpResponse, materializer);
          return messageDTO;
      });
}

Мы также обрабатываем HttpResponse, полученный от вызова службы, чтобы получить ответ JSON:

public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) {
    ObjectMapper mapper = new ObjectMapper();
    return mapper.convertValue(jsonNode, MessageDTO.class);
}

Класс MessageConverter — это утилита для преобразования между JsonNode и DTO:

Далее нам нужно отбросить сущность. Удобный метод discardEntityBytes служит для простого удаления объекта, если он не имеет для нас никакой цели.

private void discardEntity(
  HttpResponse httpResponse, Materializer materializer) {
    HttpMessage.DiscardedEntity discarded = 
      httpResponse.discardEntityBytes(materializer);
    discarded.completionStage()
      .whenComplete((done, ex) -> 
        log.info("Entity discarded completely!"));
}

Давайте посмотрим, как отбрасывать байты:

Теперь, выполнив обработку WebSocket, давайте посмотрим, как мы можем настроить клиент для этого, используя HTML5 WebSockets.

4. Настройка клиента WebSocket

Для нашего клиента давайте создадим простое веб-приложение для чата.

4.1. Действие контроллера

public Result index(Http.Request request) {
    String url = routes.HomeController.socket()
      .webSocketURL(request);
    return ok(views.html.index.render(url));
}

Нам нужно определить действие контроллера, которое отображает индексную страницу. Мы поместим это в класс контроллера app.controllers.HomeController:

4.2. Страница шаблона

<div id="messageContent"></div>F
<form>
    <textarea id="messageInput"></textarea>
    <button id="sendButton">Send</button>
</form>

Теперь давайте перейдем на страницу app/views/ndex.scala.html и добавим контейнер для полученных сообщений и форму для захвата нового сообщения:

@(url: String)

Мы также необходимо передать URL-адрес действия контроллера WebSocket, объявив этот параметр в верхней части страницы app/views/index.scala.html:

4.3. Обработчики событий WebSocket в JavaScript

Теперь мы можем добавить JavaScript для обработки событий WebSocket. Для простоты мы добавим функции JavaScript внизу страницы app/views/index.scala.html.

var webSocket;
var messageInput;

function init() {
    initWebSocket();
}

function initWebSocket() {
    webSocket = new WebSocket("@url");
    webSocket.onopen = onOpen;
    webSocket.onclose = onClose;
    webSocket.onmessage = onMessage;
    webSocket.onerror = onError;
}

Объявим обработчики событий:

function onOpen(evt) {
    writeToScreen("CONNECTED");
}

function onClose(evt) {
    writeToScreen("DISCONNECTED");
}

function onError(evt) {
    writeToScreen("ERROR: " + JSON.stringify(evt));
}

function onMessage(evt) {
    var receivedData = JSON.parse(evt.data);
    appendMessageToView("Server", receivedData.body);
}

Добавим сами обработчики:

function appendMessageToView(title, message) {
    $("#messageContent").append("<p>" + title + ": " + message + "</p>");
}

function writeToScreen(message) {
    console.log("New message: ", message);
}

Затем для представления вывода воспользуемся функциями appendMessageToView и writeToScreen:

4.4. Запуск и тестирование приложения

cd websockets
sbt run

Мы готовы протестировать приложение, поэтому давайте запустим его:

Когда приложение запущено, мы можем общаться с сервером, посетив http://localhost:9000: ~ ~~ Каждый раз, когда мы набираем сообщение и нажимаем «Отправить», сервер немедленно отвечает некоторым lorem ipsum из службы JSON Placeholder.

5. Непосредственная обработка WebSockets с помощью потоков Akka

Если мы обрабатываем поток событий из источника и отправляем их клиенту, то мы можем смоделировать это на основе потоков Akka.

Давайте посмотрим, как мы можем использовать потоки Akka на примере, когда сервер отправляет сообщения каждые две секунды.

Начнем с действия WebSocket в HomeController:

public WebSocket akkaStreamsSocket() {
    return WebSocket.Json.accept(request -> {
        Sink<JsonNode, ?> in = Sink.foreach(System.out::println);
        MessageDTO messageDTO = 
          new MessageDTO("1", "1", "Title", "Test Body");
        Source<JsonNode, ?> out = Source.tick(
          Duration.ofSeconds(2),
          Duration.ofSeconds(2),
          MessageConverter.messageToJsonNode(messageDTO)
        );
        return Flow.fromSinkAndSource(in, out);
    });
}

Метод Source#tick принимает три параметра. Первая — начальная задержка перед обработкой первого тика, а вторая — интервал между последовательными тиками. Мы установили оба значения на две секунды в приведенном выше фрагменте. Третий параметр — это объект, который должен возвращаться на каждом тике.

Чтобы увидеть это в действии, нам нужно изменить URL-адрес в действии index и сделать так, чтобы он указывал на конечную точку akkaStreamsSocket:

String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);

Теперь, обновляя страницу, мы будем видеть новую запись каждые две секунды. :

6. Завершение Актера

В какой-то момент нам нужно будет закрыть чат либо по запросу пользователя, либо по тайм-ауту.

6.1. Обработка завершения актора

Как определить, что WebSocket был закрыт?

Play автоматически закроет WebSocket, когда актор, обрабатывающий WebSocket, завершится. Таким образом, мы можем справиться с этим сценарием, реализуя метод Actor#postStop:

@Override
public void postStop() throws Exception {
    log.info("Messenger actor stopped at {}",
      OffsetDateTime.now()
      .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}

6.2. Завершение актора вручную

Кроме того, если нам нужно остановить актера, мы можем послать ему PoisonPill. В нашем примерном приложении мы должны иметь возможность обрабатывать запрос «стоп».

Давайте посмотрим, как это сделать в методе onSendMessage:

private void onSendMessage(JsonNode jsonNode) {
    RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
    String message = requestDTO.getMessage().toLowerCase();
    if("stop".equals(message)) {
        MessageDTO messageDTO = 
          createMessageDTO("1", "1", "Stop", "Stopping actor");
        out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf());
        self().tell(PoisonPill.getInstance(), getSelf());
    } else {
        log.info("Actor received. {}", requestDTO);
        processMessage(requestDTO);
    }
}

«

«Когда мы получаем сообщение, мы проверяем, является ли это запросом на остановку. Если это так, мы посылаем PoisonPill. В противном случае мы обрабатываем запрос.

7. Параметры конфигурации

Мы можем настроить несколько параметров для обработки WebSocket. Давайте посмотрим на несколько.

7.1. Длина кадра WebSocket

Коммуникация WebSocket включает обмен кадрами данных.

Длина кадра WebSocket настраивается. У нас есть возможность отрегулировать длину рамы в соответствии с требованиями нашего приложения.

play.server.websocket.frame.maxLength = 64k

Настройка более короткой длины кадра может помочь снизить количество атак типа «отказ в обслуживании», использующих длинные кадры данных. Мы можем изменить длину кадра для приложения, указав максимальную длину в application.conf:

sbt -Dwebsocket.frame.maxLength=64k run

Мы также можем установить эту опцию конфигурации, указав максимальную длину в качестве параметра командной строки:

7.2. Тайм-аут простоя соединения

По умолчанию актор, который мы используем для обработки WebSocket, завершается через одну минуту. Это связано с тем, что сервер Play, на котором работает наше приложение, имеет тайм-аут простоя по умолчанию, равный 60 секундам. Это означает, что все соединения, которые не получают запрос в течение шестидесяти секунд, закрываются автоматически.

play.server.http.idleTimeout = "infinite"

Мы можем изменить это с помощью параметров конфигурации. Давайте перейдем к нашему application.conf и изменим сервер, чтобы он не имел тайм-аута простоя:

sbt -Dhttp.idleTimeout=infinite run

Или мы можем передать эту опцию в качестве аргументов командной строки:

Мы также можем настроить это, указав devSettings в build.sbt.

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"

Параметры конфигурации, указанные в build.sbt, используются только в разработке, они будут игнорироваться в производстве:

Если мы перезапустим приложение, актор не завершится.

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"

Мы можем изменить значение на секунды:

Мы можем узнать больше о доступных параметрах конфигурации в документации Play Framework.

8. Заключение

В этом руководстве мы реализовали WebSockets в Play Framework с актерами Akka и потоками Akka.

Затем мы рассмотрели, как напрямую использовать актеры Akka, а затем увидели, как можно настроить Akka Streams для обработки соединения WebSocket.

На стороне клиента мы использовали JavaScript для обработки событий WebSocket.

Наконец, мы рассмотрели некоторые параметры конфигурации, которые мы можем использовать.