«1. Введение

По умолчанию в Spring AMQP ошибочное сообщение повторно ставится в очередь для следующего раунда потребления. Следовательно, может возникнуть бесконечный цикл потребления, что приведет к нестабильной ситуации и пустой трате ресурсов.

Хотя использование очереди недоставленных сообщений является стандартным способом обработки ошибочных сообщений, мы можем захотеть повторить попытку использования сообщений и вернуть систему в нормальное состояние.

В этом уроке мы представим два разных способа реализации стратегии повторных попыток под названием «Экспоненциальная отсрочка».

2. Предварительные условия

В этом руководстве мы будем использовать RabbitMQ, популярную реализацию AMQP. Следовательно, мы можем обратиться к этой статье Spring AMQP для получения дальнейших инструкций о том, как настроить и использовать RabbitMQ с Spring.

Ради простоты мы также будем использовать образ докера для нашего экземпляра RabbitMQ, хотя подойдет любой экземпляр RabbitMQ, прослушивающий порт 5672.

Давайте запустим док-контейнер RabbitMQ:

docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management

Чтобы реализовать наши примеры, нам нужно добавить зависимость от spring-boot-starter-amqp. Последняя версия доступна на Maven Central:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.2.4.RELEASE</version>
    </dependency>
</dependencies>

3. Способ блокировки

Наш первый способ будет использовать фикстуры Spring Retry. Мы создадим простую очередь и потребителя, настроенного на ожидание в течение некоторого времени между повторными попытками ошибочного сообщения.

Во-первых, давайте создадим нашу очередь:

@Bean
public Queue blockingQueue() {
    return QueueBuilder.nonDurable("blocking-queue").build();
}

Во-вторых, давайте настроим стратегию отсрочки в RetryOperationsInterceptor и подключим ее к пользовательской RabbitListenerContainerFactory:

@Bean
public RetryOperationsInterceptor retryInterceptor() {
    return RetryInterceptorBuilder.stateless()
      .backOffOptions(1000, 3.0, 10000)
      .maxAttempts(5)
      .recoverer(observableRecoverer())
      .build();
}

@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(
  ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);

    Advice[] adviceChain = { retryInterceptor };
    factory.setAdviceChain(adviceChain);

    return factory;
}

Как показано выше, мы настраиваем начальный интервал 1000 мс и множитель 3,0, до максимального времени ожидания 10000 мс. Кроме того, после пяти попыток сообщение будет удалено.

Давайте добавим нашего потребителя и вызовем сообщение о сбое, создав исключение:

@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory")
public void consumeBlocking(String payload) throws Exception {
    logger.info("Processing message from blocking-queue: {}", payload);

    throw new Exception("exception occured!");
}

Наконец, давайте создадим тест и отправим два сообщения в нашу очередь:

@Test
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
    int nb = 2;

    CountDownLatch latch = new CountDownLatch(nb);
    observableRecoverer.setObserver(() -> latch.countDown());

    for (int i = 1; i <= nb; i++) {
        rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
    }

    latch.await();
}

Имейте в виду, что CountdownLatch используется только как тестовое приспособление.

Давайте запустим тест и проверим вывод нашего журнала:

2020-02-18 21:17:55.638  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:56.641  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:59.644  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:08.654  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.657  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.875  ERROR : java.lang.Exception: exception occured!
2020-02-18 21:18:18.858  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:19.860  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:22.863  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:31.867  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.871  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.875 ERROR : java.lang.Exception: exception occured!

Как видно, этот журнал правильно показывает экспоненциальное время ожидания между каждой повторной попыткой. Пока наша стратегия отсрочки работает, наш потребитель блокируется до исчерпания повторных попыток. Тривиальное улучшение состоит в том, чтобы заставить нашего потребителя выполняться одновременно, установив атрибут параллелизма @RabbitListener:

@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory", concurrency = "2")

Однако повторная попытка сообщения по-прежнему блокирует экземпляр потребителя. Поэтому приложение может страдать от проблем с задержкой.

В следующем разделе мы представим неблокирующий способ реализации подобной стратегии.

4. Неблокирующий способ

Альтернативный способ включает несколько очередей повторных попыток в сочетании с истечением срока действия сообщения. На самом деле, когда срок действия сообщения истекает, оно попадает в очередь недоставленных сообщений. Другими словами, если потребитель DLQ отправляет сообщение обратно в исходную очередь, мы, по сути, выполняем цикл повторных попыток.

В результате количество используемых очередей повторных попыток равно количеству попыток.

Во-первых, давайте создадим очередь недоставленных сообщений для наших очередей повторных попыток:

@Bean
public Queue retryWaitEndedQueue() {
    return QueueBuilder.nonDurable("retry-wait-ended-queue").build();
}

Давайте добавим получателя в очередь повторных недоставленных сообщений. Единственная ответственность этого потребителя заключается в отправке сообщения обратно в исходную очередь:

@RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory")
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception{
    MessageProperties props = message.getMessageProperties();

    rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"), 
      props.getHeader("x-original-routing-key"), message);
}

Во-вторых, давайте создадим объект-оболочку для наших очередей повторных попыток. Этот объект будет содержать экспоненциальную конфигурацию отсрочки:

public class RetryQueues {
    private Queue[] queues;
    private long initialInterval;
    private double factor;
    private long maxWait;

    // constructor, getters and setters

В-третьих, давайте определим три очереди повторных попыток:

@Bean
public Queue retryQueue1() {
    return QueueBuilder.nonDurable("retry-queue-1")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public Queue retryQueue2() {
    return QueueBuilder.nonDurable("retry-queue-2")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public Queue retryQueue3() {
    return QueueBuilder.nonDurable("retry-queue-3")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public RetryQueues retryQueues() {
    return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
}

Затем нам нужен перехватчик для обработки потребления сообщений:

public class RetryQueuesInterceptor implements MethodInterceptor {

    // fields and constructor

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        return tryConsume(invocation, this::ack, (messageAndChannel, e) -> {
            try {
                int retryCount = tryGetRetryCountOrFail(messageAndChannel, e);
                sendToNextRetryQueue(messageAndChannel, retryCount);
            } catch (Throwable t) {
                // ...
                throw new RuntimeException(t);
            }
        });
    }

В случае успешного возврата потребителя, мы просто подтверждаем сообщение.

Однако, если потребитель выдает исключение и остаются попытки, мы отправляем сообщение в следующую очередь повторных попыток:

private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
    String retryQueueName = retryQueues.getQueueName(retryCount);

    rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
        MessageProperties props = m.getMessageProperties();
        props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
        props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
        props.setHeader("x-original-exchange", props.getReceivedExchange());
        props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());

        return m;
    });

    mac.channel.basicReject(mac.message.getMessageProperties()
      .getDeliveryTag(), false);
}

Опять же, давайте подключим наш перехватчик к пользовательской RabbitListenerContainerFactory:

@Bean
public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(
  ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);

    Advice[] adviceChain = { retryInterceptor };
    factory.setAdviceChain(adviceChain);

    return factory;
}

Наконец, мы определяем нашу основную очередь и потребителя, который имитирует ошибочное сообщение:

@Bean
public Queue nonBlockingQueue() {
    return QueueBuilder.nonDurable("non-blocking-queue")
      .build();
}

@RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory", 
  ackMode = "MANUAL")
public void consumeNonBlocking(String payload) throws Exception {
    logger.info("Processing message from non-blocking-queue: {}", payload);

    throw new Exception("Error occured!");
}

Давайте создадим еще один тест и отправим два сообщения:

@Test
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
    int nb = 2;

    CountDownLatch latch = new CountDownLatch(nb);
    retryQueues.setObserver(() -> latch.countDown());

    for (int i = 1; i <= nb; i++) {
        rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
    }

    latch.await();
}

Затем давайте запустим наш тест и проверим журнал: ~~ ~

2020-02-19 10:31:40.640  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:40.656  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:41.620  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:41.623  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:44.415  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:44.420  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.751  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:52.774 ERROR : java.lang.Exception: Error occured!
2020-02-19 10:31:52.829  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.841 ERROR : java.lang.Exception: Error occured!

«Опять же, мы видим экспоненциальное время ожидания между каждой повторной попыткой. Однако вместо блокировки до тех пор, пока не будет предпринята каждая попытка, сообщения обрабатываются одновременно.

Хотя эта настройка довольно гибкая и помогает решить проблемы с задержкой, существует распространенная ошибка. Действительно, RabbitMQ удаляет сообщение с истекшим сроком действия только тогда, когда оно достигает головы очереди. Поэтому, если сообщение имеет больший срок действия, оно заблокирует все остальные сообщения в очереди. По этой причине очередь ответов должна содержать только сообщения с одинаковым сроком действия.

4. Заключение

Как показано выше, событийные системы могут реализовать экспоненциальную стратегию отсрочки для повышения отказоустойчивости. Хотя внедрение таких решений может быть тривиальным, важно понимать, что определенное решение может быть хорошо адаптировано к небольшой системе, но вызывать проблемы с задержкой в ​​экосистемах с высокой пропускной способностью.

Исходный код доступен на GitHub.