«1. Обзор

В этом руководстве мы увидим, как мы можем реализовать API-интерфейсы на основе Server-Sent-Events с помощью Spring.

Проще говоря, Server-Sent-Events или сокращенно SSE — это стандарт HTTP, который позволяет веб-приложению обрабатывать однонаправленный поток событий и получать обновления всякий раз, когда сервер отправляет данные.

Версия Spring 4.2 уже поддерживала его, но, начиная с Spring 5, у нас теперь есть более идиоматический и удобный способ справиться с ним.

2. SSE с Spring 5 Webflux

Чтобы достичь этого, мы можем использовать такие реализации, как класс Flux, предоставляемый библиотекой Reactor, или, возможно, объект ServerSentEvent, который дает нам контроль над метаданными событий.

2.1. Потоковые события с использованием Flux

Flux — это реактивное представление потока событий — он обрабатывается по-разному в зависимости от указанного типа носителя запроса или ответа.

Чтобы создать конечную точку потоковой передачи SSE, мы должны следовать спецификациям W3C и обозначить ее MIME-тип как text/event-stream:

@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> "Flux - " + LocalTime.now().toString());
}

Метод interval создает Flux, который постепенно выдает длинные значения. Затем мы сопоставляем эти значения с желаемым результатом.

Давайте запустим наше приложение и попробуем его, просмотрев конечную точку.

Посмотрим, как браузер отреагирует на события, отправляемые сервером каждую секунду. Для получения дополнительной информации о Flux и ядре Reactor мы можем прочитать этот пост.

2.2. Использование элемента ServerSentEvent

Теперь мы обернем нашу выходную строку в объект ServerSentSevent и рассмотрим преимущества этого:

@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> ServerSentEvent.<String> builder()
        .id(String.valueOf(sequence))
          .event("periodic-event")
          .data("SSE - " + LocalTime.now().toString())
          .build());
}

Как мы можем оценить, есть несколько преимуществ использования сущность ServerSentEvent:

  1. we can handle the events metadata, which we’d need in a real case scenario
  2. we can ignore “text/event-stream” media type declaration

В этом случае мы указали идентификатор, имя события и, самое главное, фактические данные события.

Кроме того, мы могли бы добавить атрибут comments и значение повторной попытки, которое будет указывать время повторного подключения, которое будет использоваться при попытке отправить событие.

2.3. Использование событий, отправленных сервером, с помощью веб-клиента

Теперь давайте воспользуемся нашим потоком событий с помощью веб-клиента. возникает ошибка, и когда потоковая передача завершена.

public void consumeServerSentEvent() {
    WebClient client = WebClient.create("http://localhost:8080/sse-server");
    ParameterizedTypeReference<ServerSentEvent<String>> type
     = new ParameterizedTypeReference<ServerSentEvent<String>>() {};

    Flux<ServerSentEvent<String>> eventStream = client.get()
      .uri("/stream-sse")
      .retrieve()
      .bodyToFlux(type);

    eventStream.subscribe(
      content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
        LocalTime.now(), content.event(), content.id(), content.data()),
      error -> logger.error("Error receiving SSE: {}", error),
      () -> logger.info("Completed!!!"));
}

В нашем примере мы использовали метод извлечения, который представляет собой простой и понятный способ получения тела ответа.

Этот метод автоматически генерирует исключение WebClientResponseException, если мы получаем ответ 4xx или 5xx, если мы не обрабатываем сценарии, добавляющие оператор onStatus.

С другой стороны, мы могли бы также использовать метод обмена, который предоставляет доступ к ClientResponse, а также не сигнализирует об ошибках при неудачных ответах.

Мы должны учитывать, что мы можем обойти оболочку ServerSentEvent, если нам не нужны метаданные события.

3. Потоковая передача SSE в Spring MVC

Как мы уже говорили, спецификация SSE поддерживалась с версии Spring 4.2, когда был представлен класс SseEmitter.

Проще говоря, мы определим ExecutorService, поток, в котором SseEmitter будет выполнять свою работу, отправляя данные, и возвращать экземпляр эмиттера, сохраняя соединение открытым следующим образом:

Всегда следите за тем, чтобы выберите правильный ExecutorService для вашего сценария использования.

@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
    SseEmitter emitter = new SseEmitter();
    ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
    sseMvcExecutor.execute(() -> {
        try {
            for (int i = 0; true; i++) {
                SseEventBuilder event = SseEmitter.event()
                  .data("SSE MVC - " + LocalTime.now().toString())
                  .id(String.valueOf(i))
                  .name("sse event - mvc");
                emitter.send(event);
                Thread.sleep(1000);
            }
        } catch (Exception ex) {
            emitter.completeWithError(ex);
        }
    });
    return emitter;
}

Мы можем узнать больше о SSE в Spring MVC и посмотреть другие примеры, прочитав этот интересный учебник.

4. Понимание событий, отправленных сервером

Теперь, когда мы знаем, как реализовывать конечные точки SSE, давайте попробуем немного углубиться, разобравшись в некоторых основных концепциях.

SSE — это спецификация, принятая большинством браузеров для однонаправленной потоковой передачи событий в любое время.

«События» — это просто поток текстовых данных в кодировке UTF-8, которые следуют формату, определенному спецификацией.

Этот формат состоит из ряда элементов ключ-значение (id, повтор, данные и событие, которое указывает имя), разделенных разрывами строк.

Комментарии также поддерживаются.

«Спецификация никоим образом не ограничивает формат полезных данных; мы можем использовать простую строку или более сложную структуру JSON или XML.

Последний момент, который мы должны принять во внимание, — это разница между использованием потоковой передачи SSE и WebSockets.

В то время как WebSockets предлагают полнодуплексную (двунаправленную) связь между сервером и клиентом, а SSE использует однонаправленную связь.

Кроме того, WebSockets не является HTTP-протоколом и, в отличие от SSE, не предлагает стандартов обработки ошибок.

5. Заключение

Подводя итог, в этой статье мы изучили основные концепции потоковой передачи SSE, которая, несомненно, является отличным ресурсом, который позволит нам создавать системы следующего поколения.

Теперь у нас есть отличная возможность понять, что происходит под капотом, когда мы используем этот протокол.

Кроме того, мы дополнили теорию несколькими простыми примерами, которые можно найти в нашем репозитории на Github.

«