«1. Введение

В этом руководстве мы создадим производителя и потребителя сообщений, используя Spring Boot и Apache RocketMQ, платформу распределенного обмена сообщениями и потоковых данных с открытым исходным кодом.

2. Зависимости

Для проектов Maven нам нужно добавить зависимость RocketMQ Spring Boot Starter:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

3. Создание сообщений

В нашем примере мы создадим базовый генератор сообщений который будет отправлять события всякий раз, когда пользователь добавляет или удаляет товар из корзины.

Во-первых, давайте настроим расположение нашего сервера и имя группы в наших application.properties:

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=cart-producer-group

Обратите внимание, что если бы у нас было более одного сервера имен, мы могли бы перечислить их как host:port;host:port.

Теперь для простоты мы создадим приложение CommandLineRunner и сгенерируем несколько событий во время запуска приложения:

@SpringBootApplication
public class CartEventProducer implements CommandLineRunner {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(CartEventProducer.class, args);
    }

    public void run(String... args) throws Exception {
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1));
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2));
        rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1));
    }
}

CartItemEvent состоит всего из двух свойств – идентификатора элемента и количество:

class CartItemEvent {
    private String itemId;
    private int quantity;

    // constructor, getters and setters
}

В приведенном выше примере мы используем метод convertAndSend(), общий метод, определенный абстрактным классом AbstractMessageSendingTemplate, для отправки событий корзины. Он принимает два параметра: пункт назначения, которым в нашем случае является название темы, и полезную нагрузку сообщения.

4. Потребитель сообщений

Использование сообщений RocketMQ так же просто, как создание компонента Spring с аннотацией @RocketMQMessageListener и реализация интерфейса RocketMQListener:

@SpringBootApplication
public class CartEventConsumer {

    public static void main(String[] args) {
        SpringApplication.run(CartEventConsumer.class, args);
    }

    @Service
    @RocketMQMessageListener(
      topic = "cart-item-add-topic",
      consumerGroup = "cart-consumer_cart-item-add-topic"
    )
    public class CardItemAddConsumer implements RocketMQListener<CartItemEvent> {
        public void onMessage(CartItemEvent addItemEvent) {
            log.info("Adding item: {}", addItemEvent);
            // additional logic
        }
    }

    @Service
    @RocketMQMessageListener(
      topic = "cart-item-removed-topic",
      consumerGroup = "cart-consumer_cart-item-removed-topic"
    )
    public class CardItemRemoveConsumer implements RocketMQListener<CartItemEvent> {
        public void onMessage(CartItemEvent removeItemEvent) {
            log.info("Removing item: {}", removeItemEvent);
            // additional logic
        }
    }
}

Нам нужно создать отдельный компонент для каждой темы сообщения, которую мы используем слушать. В каждом из этих прослушивателей мы определяем имя темы и имя группы потребителей с помощью аннотации @RocketMQMessageListener.

5. Синхронная и асинхронная передача

В предыдущих примерах мы использовали метод convertAndSend для отправки наших сообщений. Однако у нас есть и другие варианты.

Мы могли бы, например, вызвать syncSend, который отличается от convertAndSend, потому что он возвращает объект SendResult.

Его можно использовать, например, для проверки успешности отправки нашего сообщения или получения его идентификатора:

public void run(String... args) throws Exception { 
    SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("bike", 1)); 
    SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("computer", 2)); 
    SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic", 
      new CartItemEvent("bike", 1)); 
}

Как и convertAndSend, этот метод возвращается только после завершения процедуры отправки.

Мы должны использовать синхронную передачу в случаях, требующих высокой надежности, таких как важные уведомления или SMS-уведомления.

С другой стороны, вместо этого мы можем захотеть отправить сообщение асинхронно и получить уведомление, когда отправка завершится.

Мы можем сделать это с помощью asyncSend, который принимает SendCallback в качестве параметра и немедленно возвращает результат:

rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        log.error("Successfully sent cart item");
    }

    @Override
    public void onException(Throwable throwable) {
        log.error("Exception during cart item sending", throwable);
    }
});

Мы используем асинхронную передачу в случаях, требующих высокой пропускной способности.

Наконец, для сценариев с очень высокими требованиями к пропускной способности мы можем использовать sendOneWay вместо asyncSend. sendOneWay отличается от asyncSend тем, что не гарантирует отправку сообщения.

Одностороннюю передачу также можно использовать для обычных случаев надежности, таких как сбор журналов.

6. Отправка сообщений в транзакции

RocketMQ предоставляет нам возможность отправлять сообщения в рамках транзакции. Мы можем сделать это с помощью метода sendInTransaction():

MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build();
rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);

Также мы должны реализовать интерфейс RocketMQLocalTransactionListener:

@RocketMQTransactionListener(txProducerGroup="test-transaction")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
      @Override
      public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
          // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.UNKNOWN;
      }

      @Override
      public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
          // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.COMMIT;
      }
}

В sendMessageInTransaction() первым параметром является имя транзакции. Оно должно совпадать с полем участника @RocketMQTransactionListener txProducerGroup.

7. Конфигурация производителя сообщений

Мы также можем настроить аспекты самого производителя сообщений:

    «Rocketmq.producer.send-message-timeout: время ожидания отправки сообщения в миллисекундах — значение по умолчанию равно 3000. rocketmq.producer.compress-message-body-threshold: Порог, выше которого RocketMQ будет сжимать сообщения — значение по умолчанию. составляет 1024. Rocketmq.producer.max-message-size: Максимальный размер сообщения в байтах — значение по умолчанию равно 4096. Rocketmq.producer.retry-times-when-send-async-failed: Максимальное количество попыток выполнить внутренне в асинхронном режиме перед отправкой ошибки — значение по умолчанию равно 2. Rocketmq.producer.retry-next-server: указывает, следует ли повторить попытку другого брокера при внутренней отправке ошибки — значение по умолчанию — false. Rocketmq.producer.retry-times-when-send-failed: максимальное количество попыток внутреннего выполнения в асинхронном режиме перед отправкой ошибки — значение по умолчанию равно 2.

8. Заключение

В этой статье мы узнали, как отправлять и использовать сообщения с помощью Apache RocketMQ и Spring Boot. Как всегда, весь исходный код доступен на GitHub.