«1. Обзор

В этом руководстве показано, как создать простое реактивное приложение Spring Boot, которое интегрируется с сервером обмена сообщениями RabbitMQ, популярной реализацией стандарта обмена сообщениями AMQP.

Мы рассмотрим оба сценария — «точка-точка» и «публикация-подписка» — с использованием распределенной установки, которая подчеркивает различия между обоими шаблонами.

Обратите внимание, что мы предполагаем базовые знания AMQP, RabbitMQ и Spring Boot, в частности, таких ключевых понятий, как Exchanges, Queues, Topics и так далее. Дополнительную информацию об этих концепциях можно найти по ссылкам ниже:

    Обмен сообщениями с помощью Spring AMQP. Введение в RabbitMQ

2. Настройка сервера RabbitMQ

Хотя мы могли бы настроить локальный RabbitMQ локально, на практике мы скорее всего, будет использоваться выделенная установка с дополнительными функциями, такими как высокая доступность, мониторинг, безопасность и т. д.

Чтобы смоделировать такую ​​среду на нашей машине разработки, мы будем использовать Docker для создания сервера, который будет использовать наше приложение. .

Следующая команда запустит автономный сервер RabbitMQ:

$ docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3

Мы не объявляем никаких постоянных томов, поэтому непрочитанные сообщения будут потеряны между перезапусками. Служба будет доступна на порту 5672 на хосте.

Мы можем проверить журналы сервера с помощью команды docker logs, которая должна выдать такой вывод:

$ docker logs rabbitmq
2018-06-09 13:42:29.718 [info] <0.33.0>
  Application lager started on node [email protected]
// ... some lines omitted
2018-06-09 13:42:33.491 [info] <0.226.0>
 Starting RabbitMQ 3.7.5 on Erlang 20.3.5
 Copyright (C) 2007-2018 Pivotal Software, Inc.
 Licensed under the MPL.  See http://www.rabbitmq.com/

  ##  ##
  ##  ##      RabbitMQ 3.7.5. Copyright (C) 2007-2018 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
  ######  ##
  ##########  Logs: <stdout>

              Starting broker...
2018-06-09 13:42:33.494 [info] <0.226.0>
 node           : [email protected]
 home dir       : /var/lib/rabbitmq
 config file(s) : /etc/rabbitmq/rabbitmq.conf
 cookie hash    : CY9rzUYh03PK3k6DJie09g==
 log(s)         : <stdout>
 database dir   : /var/lib/rabbitmq/mnesia/[email protected]

// ... more log lines

Поскольку образ включает утилиту rabbitmqctl, мы можем использовать ее для выполнения административных задач в контексте нашего бегущего образа.

Например, мы можем получить информацию о состоянии сервера с помощью следующей команды:


$ docker exec rabbitmq rabbitmqctl status
Status of node [email protected] ...
[{pid,299},
 {running_applications,
     [{rabbit,"RabbitMQ","3.7.5"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.7.5"},
// ... other info omitted for brevity

Другие полезные команды включают в себя:

    list_exchanges: список всех объявленных бирж list_queues: список всех объявленных очередей, включая количество непрочитанных сообщений list_bindings: список всех определений привязок между обменами и очередями, включая ключи маршрутизации

3. Настройка проекта Spring AMQP

После того, как наш сервер RabbitMQ запущен и работает, мы можем перейти к созданию нашего проекта Spring. Этот пример проекта позволит любому клиенту REST отправлять и/или получать сообщения на сервер обмена сообщениями, используя модуль Spring AMQP и соответствующий стартер Spring Boot для связи с ним.

Основные зависимости, которые нам нужно добавить в наш файл проекта pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.0.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>2.0.2.RELEASE</version> 
</dependency>

spring-boot-starter-amqp содержит все, что связано с AMQP, основная зависимость, используемая для реализации нашего реактивного сервера REST.

Примечание: вы можете проверить последнюю версию модулей Spring Boot Starter AMQP и Webflux на Maven Central.


4. Сценарий 1: двухточечный обмен сообщениями

В этом первом сценарии мы будем использовать прямой обмен, который является логическим объектом в брокере, который получает сообщения от клиентов.

Прямой обмен направляет все входящие сообщения в одну — и только одну — очередь, из которой они будут доступны для потребления клиентами. Несколько клиентов могут подписаться на одну и ту же очередь, но только один из них получит данное сообщение.

4.1. Настройка Exchange и очередей

В нашем сценарии мы используем объект DestinationInfo, который инкапсулирует имя Exchange и ключ маршрутизации. Карта с указанием имени пункта назначения будет использоваться для хранения всех доступных пунктов назначения.

За эту первоначальную настройку будет отвечать следующий метод @PostConstruct:

@Autowired
private AmqpAdmin amqpAdmin;
    
@Autowired
private DestinationsConfig destinationsConfig;

@PostConstruct
public void setupQueueDestinations() {
    destinationsConfig.getQueues()
        .forEach((key, destination) -> {
            Exchange ex = ExchangeBuilder.directExchange(
              destination.getExchange())
              .durable(true)
              .build();
            amqpAdmin.declareExchange(ex);
            Queue q = QueueBuilder.durable(
              destination.getRoutingKey())
              .build();
            amqpAdmin.declareQueue(q);
            Binding b = BindingBuilder.bind(q)
              .to(ex)
              .with(destination.getRoutingKey())
              .noargs();
            amqpAdmin.declareBinding(b);
        });
}

Этот метод использует bean-компонент adminAmqp, созданный Spring, для объявления обменов, очередей и связывания их вместе с использованием заданного ключа маршрутизации.

Все назначения поступают из bean-компонента DestinationsConfig, который является классом @ConfigurationProperties, используемым в нашем примере.

Этот класс имеет свойство, заполненное объектами DestinationInfo, построенными на основе сопоставлений, считанных из файла конфигурации application.yml.

4.2. Конечная точка производителя

Производители будут отправлять сообщения, отправляя HTTP POST в местоположение /queue/{name}.

Это реактивная конечная точка, поэтому мы используем Mono для возврата простого подтверждения:

@SpringBootApplication
@EnableConfigurationProperties(DestinationsConfig.class)
@RestController
public class SpringWebfluxAmqpApplication {
 
    // ... other members omitted
 
    @Autowired
    private AmqpTemplate amqpTemplate;

    @PostMapping(value = "/queue/{name}")
    public Mono<ResponseEntity<?>> sendMessageToQueue(
      @PathVariable String name, @RequestBody String payload) {

        DestinationInfo d = destinationsConfig
          .getQueues().get(name);
        if (d == null) {
            return Mono.just(
              ResponseEntity.notFound().build());
        }
    
        return Mono.fromCallable(() -> {
            amqpTemplate.convertAndSend(
              d.getExchange(), 
              d.getRoutingKey(), 
              payload);  
            return ResponseEntity.accepted().build();
        });
    }

«

«Сначала мы проверяем, соответствует ли параметр name допустимому месту назначения, и если да, мы используем экземпляр amqpTemplate с автоматическим связыванием, чтобы фактически отправить полезную нагрузку — простое строковое сообщение — в RabbitMQ.

4.3. MessageListenerContainer Factory

Для асинхронного получения сообщений Spring AMQP использует абстрактный класс MessageContainerListener, который является посредником в потоке информации из очередей AMQP и прослушивателей, предоставляемых приложением.

Поскольку нам нужна конкретная реализация этого класса, чтобы прикрепить наши прослушиватели сообщений, мы определяем фабрику, которая изолирует код контроллера от его фактической реализации.

@Component
public class MessageListenerContainerFactory {

    @Autowired
    private ConnectionFactory connectionFactory;

    public MessageListenerContainerFactory() {}

    public MessageListenerContainer createMessageListenerContainer(String queueName) {
        SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
        mlc.addQueueNames(queueName);
        return mlc;
    }
}

В нашем случае фабричный метод возвращает новый SimpleMessageContainerListener каждый раз, когда мы вызываем его метод createMessageListenerContainer:

4.4. Конечная точка потребителя

Потребители будут получать доступ к тому же адресу конечной точки, который используется производителями (/queue/{name}) для получения сообщений.

@Autowired
private MessageListenerContainerFactory messageListenerContainerFactory;

@GetMapping(
  value = "/queue/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {

    DestinationInfo d = destinationsConfig
      .getQueues()
      .get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
          .build());
    }

    MessageListenerContainer mlc = messageListenerContainerFactory
      .createMessageListenerContainer(d.getRoutingKey());

    Flux<String> f = Flux.<String> create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            mlc.start();
        });
        emitter.onDispose(() -> {
            mlc.stop();
        });
      });

    return Flux.interval(Duration.ofSeconds(5))
      .map(v -> "No news is good news")
      .mergeWith(f);
}

Эта конечная точка возвращает поток событий, где каждое событие соответствует полученному сообщению:

После первоначальной проверки имени получателя конечная точка потребителя создает MessageListenerContainer, используя MessageListenerContainerFactory и имя очереди, восстановленное из нашего реестр.

Получив наш MessageListenerContainer, мы создаем поток сообщений, используя один из его методов конструктора create().

В нашем конкретном случае мы используем тот, который принимает лямбду с аргументом FluxSink, который мы затем используем для соединения асинхронного API Spring AMQP на основе прослушивателя с нашим реактивным приложением.

Мы также присоединяем две дополнительные лямбды к обратным вызовам эмиттера onRequest() и onDispose(), чтобы наш MessageListenerContainer мог выделять/освобождать свои внутренние ресурсы в соответствии с жизненным циклом Flux.

Наконец, мы объединяем полученный поток с другим потоком, созданным с помощью interval(), который создает новое событие каждые пять секунд. Эти фиктивные сообщения играют важную роль в нашем случае: без них мы бы обнаружили отключение клиента только после получения сообщения и невозможности его отправки, что может занять много времени в зависимости от вашего конкретного варианта использования.

4.5. Тестирование

Теперь, когда у нас настроены конечные точки потребителя и издателя, мы можем провести некоторые тесты с нашим образцом приложения.

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    
destinations:
  queues:
    NYSE:
      exchange: nyse
      routing-key: NYSE

Нам нужно определить детали подключения к серверу RabbitMQ и по крайней мере один пункт назначения в нашем application.yml, который должен выглядеть следующим образом:

Свойства spring.rabbitmq.* определяют основные свойства, необходимые для подключения на наш сервер RabbitMQ, работающий в локальном контейнере Docker. Обратите внимание, что IP-адрес, показанный выше, является лишь примером и может отличаться в конкретной настройке.

Очереди задаются с использованием destinations.queues.\u003cимя\u003e.*, где \u003cимя\u003e используется в качестве имени назначения. Здесь мы объявили один пункт назначения с именем «NYSE», который будет отправлять сообщения на биржу «nyse» на RabbitMQ с ключом маршрутизации «NYSE».

Как только мы запустим сервер через командную строку или из нашей IDE, мы сможем начать отправлять и получать сообщения. Мы будем использовать утилиту curl, общую утилиту, доступную для ОС Windows, Mac и Linux.

$ curl -v -d "Test message" http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 12
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 12 out of 12 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

В следующем листинге показано, как отправить сообщение адресату и ожидаемый ответ от сервера:

$ docker exec rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    1

После выполнения этой команды мы можем убедиться, что сообщение было получено RabbitMQ и готово к использованию, выдав следующая команда:

$ curl -v http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:Test message

data:No news is good news...

... same message repeating every 5 secs

Теперь мы можем читать сообщения с помощью curl с помощью следующей команды:

Как мы видим, сначала мы получаем ранее сохраненное сообщение, а затем начинаем получать наше фиктивное сообщение каждые 5 секунды.

Если мы снова запустим команду для вывода списка очередей, мы увидим, что сообщений нет:

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    0

$ docker exec rabbitmq rabbitmqctl list_queues

5. Сценарий 2: Публикация-подписка

Другой распространенным сценарием для приложений обмена сообщениями является шаблон публикации-подписки, в котором одно сообщение должно быть отправлено нескольким потребителям.

RabbitMQ предлагает два типа обменов, поддерживающих такие приложения: разветвленный и тематический.

«Основное различие между этими двумя типами заключается в том, что последний позволяет нам фильтровать, какие сообщения получать, на основе шаблона ключа маршрутизации (например, «alarm.mailserver.*»), предоставленного во время регистрации, в то время как первый просто копирует входящие сообщения в все связанные очереди.

RabbitMQ также поддерживает обмен заголовками, что позволяет выполнять более сложную фильтрацию сообщений, но его использование выходит за рамки этой статьи.

5.1. Настройка мест назначения

Мы определяем места назначения Pub/Sub во время запуска с помощью другого метода @PostConstruct, как мы это делали в сценарии «точка-точка».

@PostConstruct
public void setupTopicDestinations(
    destinationsConfig.getTopics()
      .forEach((key, destination) -> {
          Exchange ex = ExchangeBuilder
            .topicExchange(destination.getExchange())
            .durable(true)
            .build();
            amqpAdmin.declareExchange(ex);
      });
}

Единственное отличие состоит в том, что мы создаем только обмены, но не очереди — они будут созданы по требованию и привязаны к обмену позже, так как нам нужна эксклюзивная очередь для каждого клиента:

5.2 . Конечная точка издателя

Клиенты будут использовать конечную точку издателя, доступную в расположении /topic/{name}, для публикации сообщений, которые будут отправлены всем подключенным клиентам.

@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(
  @PathVariable String name, @RequestBody String payload) {

    DestinationInfo d = destinationsConfig
      .getTopics()
      .get(name);
    
    if (d == null) {
        return Mono.just(ResponseEntity.notFound().build());
    }      
    
   return Mono.fromCallable(() -> {
       amqpTemplate.convertAndSend(
         d.getExchange(), d.getRoutingKey(),payload);   
            return ResponseEntity.accepted().build();
        });
    }

Как и в предыдущем сценарии, мы используем @PostMapping, который возвращает Mono со статусом после отправки сообщения:

5.3. Конечная точка подписчика

Наша конечная точка подписчика будет располагаться по адресу /topic/{name}, создавая поток сообщений для подключенных клиентов.

@GetMapping(
  value = "/topic/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
    DestinationInfo d = destinationsConfig.getTopics()
        .get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
            .build());
    }
    Queue topicQueue = createTopicQueue(d);
    String qname = topicQueue.getName();
    MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);
    Flux<String> f = Flux.<String> create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            mlc.start();
        });
        emitter.onDispose(() -> {
            amqpAdmin.deleteQueue(qname);
            mlc.stop();
        });            
      });
    
    return Flux.interval(Duration.ofSeconds(5))
        .map(v -> "No news is good news")
        .mergeWith(f);
}

Эти сообщения включают в себя как полученные сообщения, так и фиктивные сообщения, генерируемые каждые 5 секунд:

Этот код в основном такой же, как мы видели в предыдущем случае, со следующими отличиями: во-первых, мы создаем новая Очередь для каждого нового подписчика.

private Queue createTopicQueue(DestinationInfo destination) {

    Exchange ex = ExchangeBuilder
      .topicExchange(destination.getExchange())
      .durable(true)
      .build();
    amqpAdmin.declareExchange(ex);
    Queue q = QueueBuilder
      .nonDurable()
      .build();     
    amqpAdmin.declareQueue(q);
    Binding b = BindingBuilder.bind(q)
      .to(ex)
      .with(destination.getRoutingKey())
      .noargs();        
    amqpAdmin.declareBinding(b);
    return q;
}

Мы делаем это с помощью вызова метода createTopicQueue(), который использует информацию из экземпляра DestinationInfo для создания эксклюзивной неустойчивой очереди, которую мы затем привязываем к Exchange с помощью настроенного ключа маршрутизации:

~ ~~ Обратите внимание, что несмотря на то, что мы снова объявляем Exchange, RabbitMQ не создаст новый, поскольку мы уже объявили его во время запуска.

Второе отличие заключается в лямбда-выражении, которое мы передаем методу onDispose(), который на этот раз также удалит очередь при отключении подписчика.

5.3. Тестирование

destinations:
## ... queue destinations omitted      
  topics:
    weather:
      exchange: alerts
      routing-key: WEATHER

Чтобы протестировать сценарий Pub-Sub, мы должны сначала определить пункт назначения темы в нашем application.yml следующим образом:

Здесь мы определили конечную точку темы, которая будет доступна в / тема/погода местоположение. Эта конечная точка будет использоваться для отправки сообщений в обмен «оповещениями» на RabbitMQ с ключом маршрутизации «ПОГОДА».

$ docker exec docker_rabbitmq_1 rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.topic       topic
amq.fanout      fanout
amq.match       headers
amq.headers     headers
        direct
amq.rabbitmq.trace      topic
amq.direct      direct
alerts  topic

После запуска сервера мы можем убедиться, что обмен был создан с помощью команды rabbitmqctl:

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        NYSE    queue   NYSE    []
nyse    exchange        NYSE    queue   NYSE    []

Теперь, если мы выполним команду list_bindings, мы увидим, что нет очередей, связанных с «оповещениями». € exchange:

$ curl -v http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:No news is good news...

# ... same message repeating indefinitely

Давайте запустим пару подписчиков, которые будут подписываться на наш пункт назначения, открыв две командные оболочки и выполнив следующую команду для каждой:

$ curl -v -d "Hurricane approaching!" http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 22
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 22 out of 22 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

Наконец, мы используем curl один раз еще раз, чтобы послать несколько предупреждений нашим подписчикам:

Как только мы отправим сообщение, мы можем почти мгновенно увидеть сообщение «Ураган приближается!» на каждой оболочке подписчика.

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        IBOV    queue   IBOV    []
        exchange        NYSE    queue   NYSE    []
        exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g       
  queue   spring.gen-i0m0pbyKQMqpz2_KFZCd0g       []
        exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ       
  queue   spring.gen-wCHALTsIS1q11PQbARJ7eQ       []
alerts  exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g     
  queue   WEATHER []
alerts  exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ     
  queue   WEATHER []
ibov    exchange        IBOV    queue   IBOV    []
nyse    exchange        NYSE    queue   NYSE    []
quotes  exchange        NYSE    queue   NYSE    []

Если мы сейчас проверим доступные привязки, мы увидим, что у нас есть одна очередь для каждого подписчика:

Как только мы нажмем Ctrl-C в оболочке подписчика, наш шлюз в конечном итоге обнаружит, что клиент отключился и удалит эти привязки.

6. Заключение

В этой статье мы продемонстрировали, как создать простое реактивное приложение, которое взаимодействует с сервером RabbitMQ, используя модуль spring-amqp.

С помощью всего нескольких строк кода мы смогли создать функциональный шлюз HTTP-to-AMQP, который поддерживает шаблоны интеграции «точка-точка» и «публикация-подписка», которые мы можем легко расширить, добавив дополнительные функции, такие как безопасность за счет добавления стандартных функций Spring.