«1. Введение

В этом руководстве мы разберемся с основами создания реактивных систем на Java с использованием Spring и других инструментов и фреймворков.

В процессе мы обсудим, что реактивное программирование — это всего лишь средство для создания реактивной системы. Это поможет нам понять обоснование создания реактивных систем и различных спецификаций, библиотек и стандартов, которые они вдохновили на этом пути.

2. Что такое реактивные системы?

За последние несколько десятилетий в технологическом ландшафте произошло несколько потрясений, которые привели к полной трансформации наших представлений о ценности технологий. Компьютерный мир до появления Интернета и представить себе не мог, какими способами и средствами он изменит нашу сегодняшнюю жизнь.

С доступом к Интернету для широких масс и постоянно развивающимся опытом, который он обещает, архитекторы приложений должны быть начеку, чтобы удовлетворить их спрос.

По сути, это означает, что мы никогда не сможем разработать приложение так, как раньше. Высокоотзывчивое приложение больше не роскошь, а необходимость.

Это тоже касается случайных сбоев и непредсказуемой нагрузки. Потребность часа состоит не только в том, чтобы получить правильный результат, но и в том, чтобы получить его быстро! Очень важно обеспечить потрясающий пользовательский опыт, который мы обещаем предоставить.

Это то, что создает потребность в архитектурном стиле, который может дать нам реактивные системы.

2.1. Реактивный манифест

Еще в 2013 году команда разработчиков под руководством Йонаса Бонера собралась вместе, чтобы определить набор основных принципов в документе, известном как Реактивный манифест. Именно это заложило основу архитектурного стиля для создания реактивных систем. С тех пор этот манифест вызвал большой интерес со стороны сообщества разработчиков.

По сути, этот документ описывает рецепт гибкости, слабосвязанности и масштабируемости реактивной системы. Это делает такие системы простыми в разработке, устойчивыми к сбоям и, что наиболее важно, очень быстро реагирующими, что является основой для невероятного пользовательского опыта.

Так что же это за секретный рецепт? Ну, это вряд ли какой-то секрет! Манифест определяет фундаментальные характеристики или принципы реактивной системы:

    Отзывчивость: реактивная система должна обеспечивать быстрое и постоянное время отклика и, следовательно, постоянное качество обслуживания. Устойчивость: реактивная система должна оставаться реагирующей в случае случайных сбоев через репликация и изоляция Эластичность: такая система должна оставаться отзывчивой при непредсказуемых рабочих нагрузках за счет экономически эффективной масштабируемости. Управляемая сообщениями: она должна полагаться на асинхронный обмен сообщениями между компонентами системы

Эти принципы кажутся простыми и разумными, но их не всегда легко реализовать в сложной архитектуре предприятия. В этом руководстве мы разработаем пример системы на Java с учетом этих принципов!

3. Что такое реактивное программирование?

Прежде чем мы продолжим, важно понять разницу между реактивным программированием и реактивными системами. Мы используем оба эти термина довольно часто и легко неправильно понимаем одно за другое. Как мы видели ранее, реактивные системы являются результатом определенного архитектурного стиля.

Напротив, реактивное программирование — это парадигма программирования, в которой основное внимание уделяется разработке асинхронных и неблокирующих компонентов. Ядром реактивного программирования является поток данных, который мы можем наблюдать и реагировать на него, а также оказывать обратное давление. Это приводит к неблокирующему выполнению и, следовательно, к лучшей масштабируемости с меньшим количеством потоков выполнения.

Это не означает, что реактивные системы и реактивное программирование исключают друг друга. На самом деле реактивное программирование — это важный шаг к реализации реактивной системы, но это еще не все!

3.1. Реактивные потоки

«Reactive Streams — это инициатива сообщества, которая началась еще в 2013 году, чтобы предоставить стандарт для асинхронной обработки потоков с неблокирующим противодавлением. Цель здесь состояла в том, чтобы определить набор интерфейсов, методов и протоколов, которые могут описывать необходимые операции и сущности.

С тех пор появилось несколько реализаций на нескольких языках программирования, которые соответствуют спецификации реактивных потоков. К ним относятся Akka Streams, Ratpack и Vert.x и многие другие.

3.2. Реактивные библиотеки для Java

Одной из первоначальных целей реактивных потоков было включение в качестве официальной стандартной библиотеки Java. В результате спецификация реактивных потоков семантически эквивалентна библиотеке Java Flow, представленной в Java 9.

Помимо этого, есть несколько популярных вариантов реализации реактивного программирования в Java:

    Реактивные расширения: широко известные как и ReactiveX, они предоставляют API для асинхронного программирования с наблюдаемыми потоками. Они доступны для нескольких языков программирования и платформ, включая Java, где она известна как RxJava Project Reactor: это еще одна реактивная библиотека, основанная на спецификации реактивных потоков и предназначенная для создания не-приложений на JVM. Это также является основой реактивного стека в экосистеме Spring

4. Простое приложение

Для целей этого руководства мы разработаем простое приложение на основе архитектуры микросервисов с минимальным внешним интерфейсом. В архитектуре приложения должно быть достаточно элементов для создания реактивной системы.

Для нашего приложения мы примем сквозное реактивное программирование и другие шаблоны и инструменты для достижения основных характеристик реактивной системы.

4.1. Архитектура

Мы начнем с определения простой архитектуры приложения, которая не обязательно демонстрирует характеристики реактивных систем. С этого момента мы будем вносить необходимые изменения для достижения этих характеристик одну за другой.

Итак, во-первых, давайте начнем с определения простой архитектуры:

Это довольно простая архитектура, в которой есть набор микросервисов, облегчающих коммерческий вариант использования, где мы можем разместить заказ. У него также есть интерфейс для взаимодействия с пользователем, и все общение происходит как REST через HTTP. Более того, каждый микросервис управляет своими данными в отдельных базах данных, что называется «база данных для каждого сервиса».

Мы продолжим и создадим это простое приложение в следующих подразделах. Это будет нашей основой для понимания ошибочности этой архитектуры, а также способов и средств принятия принципов и практик, чтобы мы могли преобразовать ее в реактивную систему.

4.3. Микросервис инвентаризации

Микросервис инвентаризации будет отвечать за управление списком продуктов и их текущим запасом. Это также позволит изменять запас по мере обработки заказов. Мы будем использовать Spring Boot с MongoDB для разработки этой службы.

Давайте начнем с определения контроллера для предоставления некоторых конечных точек:

@GetMapping
public List<Product> getAllProducts() {
    return productService.getProducts();
}
 
@PostMapping
public Order processOrder(@RequestBody Order order) {
    return productService.handleOrder(order);
}
 
@DeleteMapping
public Order revertOrder(@RequestBody Order order) {
    return productService.revertOrder(order);
}

и службы для инкапсуляции нашей бизнес-логики:

@Transactional
public Order handleOrder(Order order) {       
    order.getLineItems()
      .forEach(l -> {
          Product> p = productRepository.findById(l.getProductId())
            .orElseThrow(() -> new RuntimeException("Could not find the product: " + l.getProductId()));
          if (p.getStock() >= l.getQuantity()) {
              p.setStock(p.getStock() - l.getQuantity());
              productRepository.save(p);
          } else {
              throw new RuntimeException("Product is out of stock: " + l.getProductId());
          }
      });
    return order.setOrderStatus(OrderStatus.SUCCESS);
}

@Transactional
public Order revertOrder(Order order) {
    order.getLineItems()
      .forEach(l -> {
          Product p = productRepository.findById(l.getProductId())
            .orElseThrow(() -> new RuntimeException("Could not find the product: " + l.getProductId()));
          p.setStock(p.getStock() + l.getQuantity());
          productRepository.save(p);
      });
    return order.setOrderStatus(OrderStatus.SUCCESS);
}

Обратите внимание, что мы сохраняем сущности внутри транзакции, что гарантирует, что в случае исключений не возникает несогласованного состояния.

Помимо этого, нам также нужно определить объекты домена, интерфейс репозитория и кучу классов конфигурации, необходимых для правильной работы.

Но поскольку они в основном шаблонные, мы не будем их рассматривать, и на них можно сослаться в репозитории GitHub, представленном в последнем разделе этой статьи.

4.4. Микросервис доставки

Микросервис доставки также не будет сильно отличаться. Он будет отвечать за проверку возможности создания отгрузки для заказа и создание ее, если это возможно.

Как и раньше, мы определим контроллер для предоставления наших конечных точек, фактически только одну конечную точку:

@PostMapping
public Order process(@RequestBody Order order) {
    return shippingService.handleOrder(order);
}

и сервис для инкапсуляции бизнес-логики, связанной с доставкой заказа:

public Order handleOrder(Order order) {
    LocalDate shippingDate = null;
    if (LocalTime.now().isAfter(LocalTime.parse("10:00"))
      && LocalTime.now().isBefore(LocalTime.parse("18:00"))) {
        shippingDate = LocalDate.now().plusDays(1);
    } else {
        throw new RuntimeException("The current time is off the limits to place order.");
    }
    shipmentRepository.save(new Shipment()
      .setAddress(order.getShippingAddress())
      .setShippingDate(shippingDate));
    return order.setShippingDate(shippingDate)
      .setOrderStatus(OrderStatus.SUCCESS);
}

«

«Наша простая служба доставки просто проверяет действительное временное окно для размещения заказов. Мы не будем обсуждать остальную часть шаблонного кода, как и раньше.

4.5. Микросервис заказов

Наконец, мы определим микросервис заказов, который помимо всего прочего будет отвечать за создание нового заказа. Интересно, что он также будет выступать в качестве службы оркестрации, где он будет связываться со службой инвентаризации и службой доставки заказа.

@PostMapping
public Order create(@RequestBody Order order) {
    Order processedOrder = orderService.createOrder(order);
    if (OrderStatus.FAILURE.equals(processedOrder.getOrderStatus())) {
        throw new RuntimeException("Order processing failed, please try again later.");
    }
    return processedOrder;
}
@GetMapping
public List<Order> getAll() {
    return orderService.getOrders();
}

Давайте определим наш контроллер с необходимыми конечными точками:

public Order createOrder(Order order) {
    boolean success = true;
    Order savedOrder = orderRepository.save(order);
    Order inventoryResponse = null;
    try {
        inventoryResponse = restTemplate.postForObject(
          inventoryServiceUrl, order, Order.class);
    } catch (Exception ex) {
        success = false;
    }
    Order shippingResponse = null;
    try {
        shippingResponse = restTemplate.postForObject(
          shippingServiceUrl, order, Order.class);
    } catch (Exception ex) {
        success = false;
        HttpEntity<Order> deleteRequest = new HttpEntity<>(order);
        ResponseEntity<Order> deleteResponse = restTemplate.exchange(
          inventoryServiceUrl, HttpMethod.DELETE, deleteRequest, Order.class);
    }
    if (success) {
        savedOrder.setOrderStatus(OrderStatus.SUCCESS);
        savedOrder.setShippingDate(shippingResponse.getShippingDate());
    } else {
        savedOrder.setOrderStatus(OrderStatus.FAILURE);
    }
    return orderRepository.save(savedOrder);
}

public List<Order> getOrders() {
    return orderRepository.findAll();
}

И сервис для инкапсуляции бизнес-логики, связанной с заказами: услуги доставки далеки от идеала. Распределенные транзакции с несколькими микросервисами — сложная тема сама по себе и выходит за рамки этого руководства.

Однако позже в этом руководстве мы увидим, как реактивная система может в определенной степени избежать необходимости в распределенных транзакциях.

Как и прежде, мы не будем рассматривать остальную часть стандартного кода. Однако на это можно ссылаться в репозитории GitHub.

4.6. Интерфейс

Давайте также добавим пользовательский интерфейс, чтобы завершить обсуждение. Пользовательский интерфейс будет основан на Angular и будет представлять собой простое одностраничное приложение.

Нам нужно создать простой компонент в Angular для управления созданием и получением заказов. Особое значение имеет та часть, где мы вызываем наш API для создания заказа:

createOrder() {
    let headers = new HttpHeaders({'Content-Type': 'application/json'});
    let options = {headers: headers}
    this.http.post('http://localhost:8080/api/orders', this.form.value, options)
      .subscribe(
        (response) => {
          this.response = response
        },
        (error) => {
          this.error = error
        }
      )
}

Приведенный выше фрагмент кода предполагает, что данные заказа будут записаны в форме и доступны в рамках компонента. Angular предлагает фантастическую поддержку для создания простых и сложных форм с использованием реактивных форм и форм, управляемых шаблонами.

Также важна часть, где мы получаем ранее созданные заказы:

getOrders() {
  this.previousOrders = this.http.get(''http://localhost:8080/api/orders'')
}

Обратите внимание, что HTTP-модуль Angular является асинхронным по своей природе и, следовательно, возвращает RxJS Observables. Мы можем обработать ответ в нашем представлении, передав его через асинхронный канал:

<div class="container" *ngIf="previousOrders !== null">
  <h2>Your orders placed so far:</h2>
  <ul>
    <li *ngFor="let order of previousOrders | async">
      <p>Order ID: {{ order.id }}, Order Status: {{order.orderStatus}}, Order Message: {{order.responseMessage}}</p>
    </li>
  </ul>
</div>

Конечно, для работы Angular потребуются шаблоны, стили и конфигурации, но на них можно ссылаться в репозитории GitHub. Обратите внимание, что здесь мы объединили все в один компонент, что в идеале не следует делать.

Но в рамках данного руководства эти проблемы не рассматриваются.

4.7. Развертывание приложения

Теперь, когда мы создали все отдельные части приложения, как нам их развернуть? Ну, мы всегда можем сделать это вручную. Но мы должны быть осторожны, так как вскоре это может стать утомительным.

В этом руководстве мы будем использовать Docker Compose для сборки и развертывания нашего приложения на машине Docker. Для этого нам потребуется добавить стандартный файл Dockerfile в каждую службу и создать файл Docker Compose для всего приложения.

Давайте посмотрим, как выглядит этот файл docker-compose.yml:

version: '3'
services:
  frontend:
    build: ./frontend
    ports:
      - "80:80"
  order-service:
    build: ./order-service
    ports:
      - "8080:8080"
  inventory-service:
    build: ./inventory-service
    ports:
      - "8081:8081"
  shipping-service:
    build: ./shipping-service
    ports:
      - "8082:8082"

Это довольно стандартное определение сервисов в Docker Compose и не требует особого внимания.

4.8. Проблемы с этой архитектурой

Теперь, когда у нас есть простое приложение с несколькими службами, взаимодействующими друг с другом, мы можем обсудить проблемы этой архитектуры. Есть то, что мы попытаемся рассмотреть в следующих разделах и в конечном итоге дойдем до состояния, в котором мы преобразовали бы наше приложение в реактивную систему!

Хотя это приложение далеко от программного обеспечения производственного уровня и есть несколько проблем, мы сосредоточимся на проблемах, которые относятся к мотивам реактивных систем:

    Сбой либо в службе инвентаризации, либо в службе доставки может иметь каскадный эффект Все обращения к внешним системам и базе данных носят блокирующий характер Развертывание не может автоматически обрабатывать сбои и колебания нагрузки

5. Реактивное программирование

«Блокирование вызовов в любой программе часто приводит к тому, что важные ресурсы просто ждут, пока что-то произойдет. К ним относятся вызовы базы данных, вызовы веб-служб и вызовы файловой системы. Если мы сможем освободить потоки выполнения от этого ожидания и предоставим механизм возврата после получения результатов, это приведет к гораздо лучшему использованию ресурсов.

Вот что дает нам принятие парадигмы реактивного программирования. Хотя для многих из этих вызовов можно переключиться на реактивную библиотеку, это возможно не для всех. Для нас, к счастью, Spring значительно упрощает использование реактивного программирования с MongoDB и REST API:

Spring Data Mongo поддерживает реактивный доступ через Java-драйвер MongoDB Reactive Streams. Он предоставляет ReactiveMongoTemplate и ReactiveMongoRepository, оба из которых имеют обширную функциональность сопоставления.

Spring WebFlux предоставляет веб-инфраструктуру с реактивным стеком для Spring, обеспечивая неблокирующий код и противодействие Reactive Streams. Он использует Reactor в качестве своей реактивной библиотеки. Кроме того, он предоставляет WebClient для выполнения HTTP-запросов с обратным давлением Reactive Streams. Он использует Reactor Netty в качестве клиентской библиотеки HTTP.

5.1. Служба инвентаризации

Мы начнем с изменения наших конечных точек для создания реактивных издателей:

@GetMapping
public Flux<Product> getAllProducts() {
    return productService.getProducts();
}
@PostMapping
public Mono<Order> processOrder(@RequestBody Order order) {
    return productService.handleOrder(order);
}

@DeleteMapping
public Mono<Order> revertOrder(@RequestBody Order order) {
    return productService.revertOrder(order);
}

@Transactional
public Mono<Order> handleOrder(Order order) {
    return Flux.fromIterable(order.getLineItems())
      .flatMap(l -> productRepository.findById(l.getProductId()))
      .flatMap(p -> {
          int q = order.getLineItems().stream()
            .filter(l -> l.getProductId().equals(p.getId()))
            .findAny().get()
            .getQuantity();
          if (p.getStock() >= q) {
              p.setStock(p.getStock() - q);
              return productRepository.save(p);
          } else {
              return Mono.error(new RuntimeException("Product is out of stock: " + p.getId()));
          }
      })
      .then(Mono.just(order.setOrderStatus("SUCCESS")));
}

@Transactional
public Mono<Order> revertOrder(Order order) {
    return Flux.fromIterable(order.getLineItems())
      .flatMap(l -> productRepository.findById(l.getProductId()))
      .flatMap(p -> {
          int q = order.getLineItems().stream()
            .filter(l -> l.getProductId().equals(p.getId()))
            .findAny().get()
            .getQuantity();
          p.setStock(p.getStock() + q);
          return productRepository.save(p);
      })
      .then(Mono.just(order.setOrderStatus("SUCCESS")));
}

Очевидно, нам также придется внести необходимые изменения в службу:

5.2 . Служба доставки

@PostMapping
public Mono<Order> process(@RequestBody Order order) {
    return shippingService.handleOrder(order);
}

Точно так же мы изменим конечную точку нашей службы доставки:

public Mono<Order> handleOrder(Order order) {
    return Mono.just(order)
      .flatMap(o -> {
          LocalDate shippingDate = null;
          if (LocalTime.now().isAfter(LocalTime.parse("10:00"))
            && LocalTime.now().isBefore(LocalTime.parse("18:00"))) {
              shippingDate = LocalDate.now().plusDays(1);
          } else {
              return Mono.error(new RuntimeException("The current time is off the limits to place order."));
          }
          return shipmentRepository.save(new Shipment()
            .setAddress(order.getShippingAddress())
            .setShippingDate(shippingDate));
      })
      .map(s -> order.setShippingDate(s.getShippingDate())
        .setOrderStatus(OrderStatus.SUCCESS));
    }

И соответствующие изменения в службе для использования реактивного программирования:

5.3. Служба заказов

@PostMapping
public Mono<Order> create(@RequestBody Order order) {
    return orderService.createOrder(order)
      .flatMap(o -> {
          if (OrderStatus.FAILURE.equals(o.getOrderStatus())) {
              return Mono.error(new RuntimeException("Order processing failed, please try again later. " + o.getResponseMessage()));
          } else {
              return Mono.just(o);
          }
      });
}

@GetMapping
public Flux<Order> getAll() {
    return orderService.getOrders();
}

Нам придется внести аналогичные изменения в конечные точки службы заказов:

public Mono<Order> createOrder(Order order) {
    return Mono.just(order)
      .flatMap(orderRepository::save)
      .flatMap(o -> {
          return webClient.method(HttpMethod.POST)
            .uri(inventoryServiceUrl)
            .body(BodyInserters.fromValue(o))
            .exchange();
      })
      .onErrorResume(err -> {
          return Mono.just(order.setOrderStatus(OrderStatus.FAILURE)
            .setResponseMessage(err.getMessage()));
      })
      .flatMap(o -> {
          if (!OrderStatus.FAILURE.equals(o.getOrderStatus())) {
              return webClient.method(HttpMethod.POST)
                .uri(shippingServiceUrl)
                .body(BodyInserters.fromValue(o))
                .exchange();
          } else {
              return Mono.just(o);
          }
      })
      .onErrorResume(err -> {
          return webClient.method(HttpMethod.POST)
            .uri(inventoryServiceUrl)
            .body(BodyInserters.fromValue(order))
            .retrieve()
            .bodyToMono(Order.class)
            .map(o -> o.setOrderStatus(OrderStatus.FAILURE)
              .setResponseMessage(err.getMessage()));
      })
      .map(o -> {
          if (!OrderStatus.FAILURE.equals(o.getOrderStatus())) {
              return order.setShippingDate(o.getShippingDate())
                .setOrderStatus(OrderStatus.SUCCESS);
          } else {
              return order.setOrderStatus(OrderStatus.FAILURE)
                .setResponseMessage(o.getResponseMessage());
          }
      })
      .flatMap(orderRepository::save);
}

public Flux<Order> getOrders() {
    return orderRepository.findAll();
}

Изменения в службе будут более сложными, поскольку нам придется использовать Spring WebClient для вызова инвентаризации. и поставка реактивных конечных точек:

Такой вид оркестровки с реактивными API — непростая задача, часто подверженная ошибкам, а также трудная для отладки. Мы увидим, как это можно упростить в следующем разделе.

5.4. Внешний интерфейс

Теперь, когда наши API-интерфейсы способны передавать события по мере их возникновения, вполне естественно, что мы должны иметь возможность использовать это и в нашем внешнем интерфейсе. К счастью, Angular поддерживает EventSource, интерфейс для событий, отправленных сервером.

getOrderStream() {
    return Observable.create((observer) => {
        let eventSource = new EventSource('http://localhost:8080/api/orders')
        eventSource.onmessage = (event) => {
            let json = JSON.parse(event.data)
            this.orders.push(json)
            this._zone.run(() => {
                observer.next(this.orders)
            })
        }
        eventSource.onerror = (error) => {
            if(eventSource.readyState === 0) {
                eventSource.close()
                this._zone.run(() => {
                    observer.complete()
                })
            } else {
                this._zone.run(() => {
                    observer.error('EventSource error: ' + error)
                })
            }
        }
    })
}

Давайте посмотрим, как мы можем получить и обработать все наши предыдущие заказы в виде потока событий:

6. Архитектура, управляемая сообщениями

Первая проблема, которую мы собираемся решить, связана с обслуживанием. связь со службой. Сейчас эти коммуникации синхронны, что создает несколько проблем. К ним относятся каскадные сбои, сложная оркестровка и распределенные транзакции, и это лишь некоторые из них.

Очевидный способ решить эту проблему — сделать эти коммуникации асинхронными. Брокер сообщений для облегчения связи между службами может помочь нам. Мы будем использовать Kafka в качестве нашего брокера сообщений и Spring для Kafka для создания и потребления сообщений:

Мы будем использовать одну тему для создания и использования сообщений о заказах с различными статусами заказов для реагирования служб.

Давайте посмотрим, как нужно изменить каждую службу.

6.1. Служба инвентаризации

@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;

public void sendMessage(Order order) {
    this.kafkaTemplate.send("orders", order);
}

Давайте начнем с определения производителя сообщений для нашей службы инвентаризации:

@KafkaListener(topics = "orders", groupId = "inventory")
public void consume(Order order) throws IOException {
    if (OrderStatus.RESERVE_INVENTORY.equals(order.getOrderStatus())) {
        productService.handleOrder(order)
          .doOnSuccess(o -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_SUCCESS));
          })
          .doOnError(e -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_FAILURE)
                .setResponseMessage(e.getMessage()));
          }).subscribe();
    } else if (OrderStatus.REVERT_INVENTORY.equals(order.getOrderStatus())) {
        productService.revertOrder(order)
          .doOnSuccess(o -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_REVERT_SUCCESS));
          })
          .doOnError(e -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_REVERT_FAILURE)
                .setResponseMessage(e.getMessage()));
          }).subscribe();
    }
}

Далее нам нужно определить потребителя сообщений для службы инвентаризации, чтобы он реагировал на различные сообщения по теме:

~ ~~ Это также означает, что теперь мы можем безопасно удалить некоторые избыточные конечные точки из нашего контроллера. Этих изменений достаточно для достижения асинхронной связи в нашем приложении.

6.2. Служба доставки

@KafkaListener(topics = "orders", groupId = "shipping")
public void consume(Order order) throws IOException {
    if (OrderStatus.PREPARE_SHIPPING.equals(order.getOrderStatus())) {
        shippingService.handleOrder(order)
          .doOnSuccess(o -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.SHIPPING_SUCCESS)
                .setShippingDate(o.getShippingDate()));
          })
          .doOnError(e -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.SHIPPING_FAILURE)
                .setResponseMessage(e.getMessage()));
          }).subscribe();
    }
}

Изменения в службе доставки относительно аналогичны тому, что мы сделали ранее со службой инвентаризации. Производитель сообщения тот же, а потребитель сообщения специфичен для логики доставки:

Теперь мы можем безопасно удалить все конечные точки в нашем контроллере, поскольку они нам больше не нужны.

6.3. Заказать услугу

«Изменения в службе заказов будут немного более сложными, так как именно здесь мы выполняли всю оркестровку ранее.

@KafkaListener(topics = "orders", groupId = "orders")
public void consume(Order order) throws IOException {
    if (OrderStatus.INITIATION_SUCCESS.equals(order.getOrderStatus())) {
        orderRepository.findById(order.getId())
          .map(o -> {
              orderProducer.sendMessage(o.setOrderStatus(OrderStatus.RESERVE_INVENTORY));
              return o.setOrderStatus(order.getOrderStatus())
                .setResponseMessage(order.getResponseMessage());
          })
          .flatMap(orderRepository::save)
          .subscribe();
    } else if ("INVENTORY-SUCCESS".equals(order.getOrderStatus())) {
        orderRepository.findById(order.getId())
          .map(o -> {
              orderProducer.sendMessage(o.setOrderStatus(OrderStatus.PREPARE_SHIPPING));
              return o.setOrderStatus(order.getOrderStatus())
                .setResponseMessage(order.getResponseMessage());
          })
          .flatMap(orderRepository::save)
          .subscribe();
    } else if ("SHIPPING-FAILURE".equals(order.getOrderStatus())) {
        orderRepository.findById(order.getId())
          .map(o -> {
              orderProducer.sendMessage(o.setOrderStatus(OrderStatus.REVERT_INVENTORY));
              return o.setOrderStatus(order.getOrderStatus())
                .setResponseMessage(order.getResponseMessage());
          })
          .flatMap(orderRepository::save)
          .subscribe();
    } else {
        orderRepository.findById(order.getId())
          .map(o -> {
              return o.setOrderStatus(order.getOrderStatus())
                .setResponseMessage(order.getResponseMessage());
          })
          .flatMap(orderRepository::save)
          .subscribe();
    }
}

Тем не менее, производитель сообщения остается неизменным, а потребитель сообщений берет на себя логику, специфичную для службы заказа:

Здесь потребитель просто реагирует на сообщения заказа с разными статусами заказа. Это то, что дает нам хореографию между различными сервисами.

public Mono<Order> createOrder(Order order) {
    return Mono.just(order)
      .flatMap(orderRepository::save)
      .map(o -> {
          orderProducer.sendMessage(o.setOrderStatus(OrderStatus.INITIATION_SUCCESS));
          return o;
      })
      .onErrorResume(err -> {
          return Mono.just(order.setOrderStatus(OrderStatus.FAILURE)
            .setResponseMessage(err.getMessage()));
      })
      .flatMap(orderRepository::save);
}

Наконец, наш сервис заказов также должен измениться, чтобы поддерживать эту хореографию:

Обратите внимание, что это намного проще, чем сервис, который мы должны были написать с реактивными конечными точками в последнем разделе. Асинхронная хореография часто приводит к гораздо более простому коду, хотя и достигается за счет возможной согласованности и сложной отладки и мониторинга. Как можно догадаться, наш интерфейс больше не будет сразу получать окончательный статус заказа.

7. Служба оркестрации контейнеров

Последняя часть головоломки, которую мы хотим решить, связана с развертыванием.

Чего мы хотим от приложения, так это достаточной избыточности и тенденции к автоматическому масштабированию вверх или вниз в зависимости от необходимости.

Мы уже добились контейнеризации сервисов через Docker и управляем зависимостями между ними через Docker Compose. Хотя это фантастические инструменты сами по себе, они не помогают нам достичь того, чего мы хотим.

Следовательно, нам нужна служба оркестрации контейнеров, которая может позаботиться об избыточности и масштабируемости нашего приложения. Хотя существует несколько вариантов, одним из популярных является Kubernetes. Kubernetes предоставляет нам независимый от поставщика облачных услуг способ добиться высокомасштабируемого развертывания контейнерных рабочих нагрузок.

Kubernetes оборачивает контейнеры, такие как Docker, в поды, которые являются наименьшей единицей развертывания. Кроме того, мы можем использовать Deployment для декларативного описания желаемого состояния.

Развертывание создает наборы реплик, которые внутренне отвечают за запуск модулей. Мы можем описать минимальное количество одинаковых подов, которые должны работать в любой момент времени. Это обеспечивает избыточность и, следовательно, высокую доступность.

apiVersion: apps/v1
kind: Deployment
metadata: 
  name: inventory-deployment
spec: 
  replicas: 3
  selector:
    matchLabels:
      name: inventory-deployment
  template: 
    metadata: 
      labels: 
        name: inventory-deployment
    spec: 
      containers:
      - name: inventory
        image: inventory-service-async:latest
        ports: 
        - containerPort: 8081
---
apiVersion: apps/v1
kind: Deployment
metadata: 
  name: shipping-deployment
spec: 
  replicas: 3
  selector:
    matchLabels:
      name: shipping-deployment
  template: 
    metadata: 
      labels: 
        name: shipping-deployment
    spec: 
      containers:
      - name: shipping
        image: shipping-service-async:latest
        ports: 
        - containerPort: 8082
---
apiVersion: apps/v1
kind: Deployment
metadata: 
  name: order-deployment
spec: 
  replicas: 3
  selector:
    matchLabels:
      name: order-deployment
  template: 
    metadata: 
      labels: 
        name: order-deployment
    spec: 
      containers:
      - name: order
        image: order-service-async:latest
        ports: 
        - containerPort: 8080

Давайте посмотрим, как мы можем определить развертывание Kubernetes для наших приложений:

Здесь мы объявляем наше развертывание для поддержки трех идентичных реплик модулей в любое время. Хотя это хороший способ добавить избыточность, этого может быть недостаточно для меняющихся нагрузок. Kubernetes предоставляет еще один ресурс, известный как Horizontal Pod Autoscaler, который может масштабировать количество модулей в развертывании на основе наблюдаемых показателей, таких как загрузка ЦП.

Обратите внимание, что мы только что рассмотрели аспекты масштабируемости приложения, размещенного в кластере Kubernetes. Это не обязательно означает, что сам базовый кластер является масштабируемым. Создание кластера Kubernetes с высокой доступностью — нетривиальная задача, которая выходит за рамки данного руководства.

8. Результирующая реактивная система

    Теперь, когда мы сделали несколько улучшений в нашей архитектуре, возможно, пришло время сравнить это с определением реактивной системы. Мы продолжим оценку по четырем характеристикам реактивных систем, которые мы обсуждали ранее в этом руководстве:

Отзывчивость: принятие парадигмы реактивного программирования должно помочь нам достичь сквозного неблокирующего и, следовательно, отзывчивого приложения. Устойчивость: развертывание Kubernetes с ReplicaSet желаемого количества модулей должно обеспечивать устойчивость к случайным сбоям. Эластичность: кластер и ресурсы Kubernetes должны предоставлять нам необходимую поддержку, чтобы быть эластичными перед лицом непредсказуемых нагрузок. асинхронное взаимодействие через брокера Kafka должно помочь нам в этом

Хотя это выглядит довольно многообещающе, это далеко не конец. Честно говоря, поиск действительно реактивной системы должен быть постоянным упражнением в улучшении. Мы никогда не сможем упредить все, что может дать сбой в очень сложной инфраструктуре, где наше приложение является лишь небольшой частью.

«Таким образом, реактивная система требует надежности от каждой части, из которой состоит целое. От физической сети до инфраструктурных сервисов, таких как DNS, все они должны соответствовать друг другу, чтобы помочь нам достичь конечной цели.

Часто мы не можем управлять всеми этими деталями и предоставлять необходимые гарантии. И здесь управляемая облачная инфраструктура помогает облегчить нашу боль. Мы можем выбирать из множества услуг, таких как IaaS (Infeastrure-as-a-Service), BaaS (Backend-as-a-Service) и PaaS (Platform-as-a-Service), чтобы делегировать обязанности внешним сторонам. Это оставляет нас с ответственностью за наше приложение, насколько это возможно.

9. Заключение

В этом уроке мы рассмотрели основы реактивных систем и их сравнение с реактивным программированием. Мы создали простое приложение с несколькими микросервисами и выделили проблемы, которые собираемся решить с помощью реактивной системы.

Далее мы пошли дальше, представив реактивное программирование, архитектуру на основе сообщений и службу оркестрации контейнеров в архитектуре для реализации реактивной системы.

Наконец, мы обсудили получившуюся архитектуру и то, как она остается на пути к реактивной системе! Этот учебник не знакомит нас со всеми инструментами, фреймворками или шаблонами, которые могут помочь нам создать реактивную систему, но он знакомит нас с путешествием.