«1. Обзор
Server-Sent Events (SSE) — это спецификация, основанная на HTTP, которая обеспечивает способ установления продолжительного одноканального соединения между сервером и клиентом.
Клиент инициирует соединение SSE, используя тип медиатекста/поток событий в заголовке Accept.
Позже он обновляется автоматически без запроса сервера.
Мы можем проверить более подробную информацию о спецификации на официальной спецификации.
В этом руководстве мы познакомим вас с новой реализацией SSE JAX-RS 2.1.
Итак, мы рассмотрим, как мы можем публиковать события с помощью JAX-RS Server API. Кроме того, мы рассмотрим, как мы можем использовать их либо с помощью клиентского API JAX-RS, либо просто с помощью HTTP-клиента, такого как инструмент curl.
2. Понимание событий SSE
Событие SSE — это блок текста, состоящий из следующих полей:
-
Событие: тип события. Сервер может отправлять множество сообщений разных типов, а клиент может прослушивать только сообщения определенного типа или обрабатывать по-разному каждый тип события. Данные: сообщение, отправленное сервером. У нас может быть много строк данных для одного и того же идентификатора события: идентификатор события, используемый для отправки заголовка Last-Event-ID после повторной попытки подключения. Это полезно, так как может предотвратить отправку сервером уже отправленных событий Повторить: время в миллисекундах, в течение которого клиент устанавливает новое соединение, когда текущее потеряно. Последний полученный идентификатор будет автоматически отправлен через заголовок Last-Event-ID «:»: это комментарий, который игнорируется клиентом
Кроме того, два последовательных события разделяются двойным символом новой строки « ˜\\n\\n~.
Кроме того, данные в одном и том же событии могут быть записаны в несколько строк, как показано в следующем примере:
event: stock
id: 1
: price change
retry: 4000
data: {"dateTime":"2018-07-14T18:06:00.285","id":1,
data: "name":"GOOG","price":75.7119}
event: stock
id: 2
: price change
retry: 4000
data: {"dateTime":"2018-07-14T18:06:00.285","id":2,"name":"IBM","price":83.4611}
В JAX RS событие SSE абстрагируется интерфейсом SseEvent, или, точнее, двумя субинтерфейсами OutboundSseEvent и InboundSseEvent.
В то время как OutboundSseEvent используется в API-интерфейсе сервера и разрабатывает отправленное событие, InboundSseEvent используется API-интерфейсом клиента и абстрагирует полученное событие.
3. Публикация событий SSE
Теперь, когда мы обсудили, что такое событие SSE, давайте посмотрим, как мы можем создать и отправить его клиенту HTTP.
3.1. Настройка проекта
У нас уже есть руководство по настройке проекта Maven на основе JAX RS. Не стесняйтесь заглянуть туда, чтобы узнать, как установить зависимости и начать работу с JAX RS.
3.2. Метод ресурса SSE
Метод ресурса SSE — это метод JAX RS, который:
-
Может создавать тип мультимедиа text/event-stream Имеет введенный параметр SseEventSink, куда отправляются события Может также иметь внедренный параметр Sse, который используется в качестве точки входа для создания построителя событий
@GET
@Path("prices")
@Produces("text/event-stream")
public void getStockPrices(@Context SseEventSink sseEventSink, @Context Sse sse) {
//...
}
Следовательно, клиент должен сделать первый запрос HTTP со следующим заголовком HTTP:
Accept: text/event-stream
3.3. Экземпляр SSE
Экземпляр SSE — это контекстный компонент, который JAX RS Runtime сделает доступным для внедрения.
Мы могли бы использовать его как фабрику для создания:
-
OutboundSseEvent.Builder — позволяет нам создавать события, а затем SseBroadcaster — позволяет нам транслировать события нескольким подписчикам
Давайте посмотрим, как это работает: ~ ~~
@Context
public void setSse(Sse sse) {
this.sse = sse;
this.eventBuilder = sse.newEventBuilder();
this.sseBroadcaster = sse.newBroadcaster();
}
Теперь давайте сосредоточимся на построителе событий. OutboundSseEvent.Builder отвечает за создание OutboundSseEvent:
OutboundSseEvent sseEvent = this.eventBuilder
.name("stock")
.id(String.valueOf(lastEventId))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(Stock.class, stock)
.reconnectDelay(4000)
.comment("price change")
.build();
Как мы видим, у построителя есть методы для установки значений для всех полей событий, показанных выше. Кроме того, метод mediaType() используется для сериализации объекта Java поля данных в подходящий текстовый формат.
По умолчанию тип носителя для поля данных — text/plain. Следовательно, его не нужно указывать явно при работе с типом данных String.
В противном случае, если мы хотим обработать пользовательский объект, нам нужно указать тип носителя или предоставить собственный MessageBodyWriter. Среда выполнения JAX RS предоставляет MessageBodyWriters для наиболее известных типов мультимедиа.
Экземпляр Sse также имеет два ярлыка для создания события только с полем данных или с полем типа и данными:
OutboundSseEvent sseEvent = sse.newEvent("cool Event");
OutboundSseEvent sseEvent = sse.newEvent("typed event", "data Event");
3.4. Отправка простого события
«Теперь, когда мы знаем, как создавать события, и понимаем, как работает ресурс SSE. Отправим простое событие.
Интерфейс SseEventSink абстрагирует одно HTTP-соединение. Среда выполнения JAX-RS может сделать его доступным только путем внедрения в метод ресурса SSE.
Отправка события так же проста, как вызов SseEventSink.send().
В следующем примере будет отправлено множество обновлений акций и в конечном итоге закроется поток событий:
@GET
@Path("prices")
@Produces("text/event-stream")
public void getStockPrices(@Context SseEventSink sseEventSink /*..*/) {
int lastEventId = //..;
while (running) {
Stock stock = stockService.getNextTransaction(lastEventId);
if (stock != null) {
OutboundSseEvent sseEvent = this.eventBuilder
.name("stock")
.id(String.valueOf(lastEventId))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(Stock.class, stock)
.reconnectDelay(3000)
.comment("price change")
.build();
sseEventSink.send(sseEvent);
lastEventId++;
}
//..
}
sseEventSink.close();
}
После отправки всех событий сервер закрывает соединение, либо явно вызывая метод close(), либо, что предпочтительнее, , используя try-with-resource, поскольку SseEventSink расширяет интерфейс AutoClosable:
try (SseEventSink sink = sseEventSink) {
OutboundSseEvent sseEvent = //..
sink.send(sseEvent);
}
В нашем примере приложения мы можем увидеть, как это работает, если мы посетим:
http://localhost:9080/sse-jaxrs-server/sse.html
3.5. Широковещательные события
Широковещательная рассылка — это процесс, посредством которого события одновременно отправляются нескольким клиентам. Это достигается с помощью SseBroadcaster API и выполняется в три простых шага:
Сначала мы создаем объект SseBroadcaster из внедренного контекста Sse, как показано ранее:
SseBroadcaster sseBroadcaster = sse.newBroadcaster();
Затем клиенты должны подписаться, чтобы иметь возможность получать события Sse. Обычно это делается в методе ресурсов SSE, где вводится экземпляр контекста SseEventSink:
@GET
@Path("subscribe")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void listen(@Context SseEventSink sseEventSink) {
this.sseBroadcaster.register(sseEventSink);
}
И, наконец, мы можем инициировать публикацию события, вызвав метод широковещательной рассылки():
@GET
@Path("publish")
public void broadcast() {
OutboundSseEvent sseEvent = //...;
this.sseBroadcaster.broadcast(sseEvent);
}
Это отправит одно и то же событие для каждого зарегистрированного SseEventSink.
Чтобы продемонстрировать трансляцию, мы можем получить доступ к этому URL-адресу:
http://localhost:9080/sse-jaxrs-server/sse-broadcast.html
Затем мы можем запустить трансляцию, вызвав метод ресурса Broadcast():
curl -X GET http://localhost:9080/sse-jaxrs-server/sse/stock/publish
4. Использование событий SSE
Чтобы использовать событие SSE, отправленное сервером, мы можем использовать любой HTTP-клиент, но в этом руководстве мы будем использовать клиентский API JAX RS.
4.1. Клиентский API JAX RS для SSE
Чтобы начать работу с клиентским API для SSE, нам необходимо предоставить зависимости для реализации клиента JAX RS.
Здесь мы будем использовать реализацию клиента Apache CXF:
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-client</artifactId>
<version>${cxf-version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-sse</artifactId>
<version>${cxf-version}</version>
</dependency>
SseEventSource является сердцем этого API и создан на основе WebTarget.
Начнем с прослушивания входящих событий, которые абстрагируются интерфейсом InboundSseEvent:
Client client = ClientBuilder.newClient();
WebTarget target = client.target(url);
try (SseEventSource source = SseEventSource.target(target).build()) {
source.register((inboundSseEvent) -> System.out.println(inboundSseEvent));
source.open();
}
После установления соединения зарегистрированный потребитель событий будет вызываться для каждого полученного InboundSseEvent.
Затем мы можем использовать метод readData() для чтения исходной строки данных:
String data = inboundSseEvent.readData();
Или мы можем использовать перегруженную версию для получения десериализованного объекта Java, используя подходящий тип носителя:
Stock stock = inboundSseEvent.readData(Stock.class, MediaType.Application_Json);
Здесь мы только что предоставили простой потребитель событий, который выводит входящее событие в консоль.
5. Заключение
В этом руководстве мы сосредоточились на том, как использовать события, отправленные сервером, в JAX RS 2.1. Мы предоставили пример, демонстрирующий, как отправлять события одному клиенту, а также как транслировать события нескольким клиентам.
Наконец, мы использовали эти события с помощью клиентского API JAX-RS.
Как обычно, код этого туториала можно найти на Github.