«1. Введение

Асинхронный обмен сообщениями — это тип слабосвязанной распределенной связи, который становится все более популярным для реализации архитектур, управляемых событиями. К счастью, Spring Framework предоставляет проект Spring AMQP, позволяющий нам создавать решения для обмена сообщениями на основе AMQP.

С другой стороны, работа с ошибками в таких средах может оказаться нетривиальной задачей. Итак, в этом уроке мы рассмотрим различные стратегии обработки ошибок.

2. Настройка среды

В этом руководстве мы будем использовать RabbitMQ, который реализует стандарт AMQP. Кроме того, Spring AMQP предоставляет модуль spring-rabbit, который делает интеграцию действительно простой.

Давайте запустим RabbitMQ как отдельный сервер. Мы запустим его в контейнере Docker, выполнив следующую команду:

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

Подробную информацию о конфигурации и настройке зависимостей проекта см. в нашей статье Spring AMQP.

3. Сценарий сбоя

Обычно существует больше типов ошибок, которые могут возникнуть в системах на основе обмена сообщениями по сравнению с монолитными или однопакетными приложениями из-за их распределенного характера.

Мы можем указать некоторые типы исключений:

    Связанные с сетью или вводом-выводом — общие сбои сетевых подключений и операций ввода-вывода. Связанные с протоколом или инфраструктурой — ошибки, которые обычно представляют собой неправильную настройку инфраструктуры обмена сообщениями. Связанные с брокером — сбои, которые предупреждают о неправильной настройке между клиентами и брокером AMQP. Например, достижение определенных пределов или пороговых значений, проверка подлинности или неверная конфигурация политик. Связанные с приложениями и сообщениями — исключения, которые обычно указывают на нарушение некоторых бизнес-правил или правил приложения. самый распространенный тип ошибок.

Следует отметить, что Spring AMQP по умолчанию обрабатывает проблемы, связанные с подключением, и проблемы низкого уровня, например, применяя политики повторных попыток или повторной постановки в очередь. Кроме того, большинство сбоев и ошибок преобразуются в AmqpException или один из его подклассов.

В следующих разделах мы в основном сосредоточимся на ошибках, характерных для приложений, и ошибках высокого уровня, а затем рассмотрим глобальные стратегии обработки ошибок.

4. Настройка проекта

Теперь давайте определим простую очередь и конфигурацию обмена для начала:

Затем давайте создадим простого производителя:

public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .build();
}
 
@Bean
DirectExchange messagesExchange() {
    return new DirectExchange(EXCHANGE_MESSAGES);
}
 
@Bean
Binding bindingMessages() {
    return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}

И наконец, потребитель, выбрасывающий исключение:

public void sendMessage() {
    rabbitTemplate
      .convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES,
        SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++);
}

По умолчанию все ошибочные сообщения будут немедленно помещены в начало целевой очереди снова и снова.

@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(Message message) throws BusinessException {
    throw new BusinessException();
}

Давайте запустим наше тестовое приложение, выполнив следующую команду Maven:

Теперь мы должны увидеть аналогичный результирующий вывод:

mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.errorhandling.ErrorHandlingApp

Следовательно, по умолчанию мы увидим бесконечное количество таких сообщений. на выходе.

WARN 22260 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
Caused by: com.baeldung.springamqp.errorhandling.errorhandler.BusinessException: null

Чтобы изменить это поведение, у нас есть два варианта:

Установите для параметра default-requeue-rejected значение false на стороне слушателя — spring.rabbitmq.listener.simple.default-requeue-rejected=false Вызовите AmqpRejectAndDontRequeueException – это может быть полезно для сообщений, которые в будущем не будут иметь смысла, поэтому их можно будет отбросить.

    Теперь давайте узнаем, как более разумно обрабатывать ошибочные сообщения.

5. Очередь недоставленных сообщений

Очередь недоставленных сообщений (DLQ) — это очередь, которая содержит недоставленные или ошибочные сообщения. DLQ позволяет нам обрабатывать ошибочные или неверные сообщения, отслеживать шаблоны отказов и восстанавливаться после исключений в системе.

Что еще более важно, это помогает предотвратить бесконечные циклы в очередях, которые постоянно обрабатывают неверные сообщения и снижают производительность системы.

В целом существует две основные концепции: обмен недоставленными письмами (DLX) и сама очередь недоставленных писем (DLQ). По сути, DLX — это обычный обмен, который мы можем определить как один из распространенных типов: прямой, топик или разветвленный.

Очень важно понимать, что производитель ничего не знает об очередях. Он знает только об обмене, и все созданные сообщения маршрутизируются в соответствии с конфигурацией обмена и ключом маршрутизации сообщений.

«Теперь давайте посмотрим, как обрабатывать исключения, применяя подход Dead Letter Queue.

5.1. Базовая конфигурация

Чтобы настроить DLQ, нам нужно указать дополнительные аргументы при определении нашей очереди:

В приведенном выше примере мы использовали два дополнительных аргумента: x-dead-letter-exchange и x-ключ маршрутизации недоставленных писем. Пустое строковое значение для опции x-dead-letter-exchange указывает брокеру использовать обмен по умолчанию.

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", "")
      .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
      .build();
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}

Второй аргумент так же важен, как и установка ключей маршрутизации для простых сообщений. Эта опция изменяет первоначальный ключ маршрутизации сообщения для дальнейшей маршрутизации DLX.

5.2. Маршрутизация ошибочных сообщений

Таким образом, когда сообщение не удается доставить, оно направляется на сервер обмена недоставленными письмами. Но, как мы уже отмечали, DLX — это обычная биржа. Таким образом, если неверный ключ маршрутизации сообщения не соответствует обмену, он не будет доставлен в DLQ.

Таким образом, если в нашем примере мы опустим аргумент x-dead-letter-routing-key, ошибочное сообщение застрянет в бесконечном цикле повторных попыток.

Exchange: (AMQP default)
Routing Key: baeldung-messages-queue.dlq

Кроме того, исходная метаинформация сообщения доступна в заголовке x-death:

Приведенная выше информация доступна в консоли управления RabbitMQ, обычно работающей локально на порту 15672.

x-death:
  count: 1
  exchange: baeldung-messages-exchange
  queue: baeldung-messages-queue 
  reason: rejected
  routing-keys: baeldung-messages-queue 
  time: 1571232954

Помимо этой конфигурации , если мы используем Spring Cloud Stream, мы можем даже упростить процесс настройки, используя свойства конфигурации republishToDlq и autoBindDlq.

5.3. Обмен недоставленными письмами

В предыдущем разделе мы видели, что ключ маршрутизации изменяется, когда сообщение направляется в обмен недоставленными сообщениями. Но такое поведение не всегда желательно. Мы можем изменить его, настроив DLX самостоятельно и определив его с помощью типа разветвления:

На этот раз мы определили пользовательский обмен типа разветвления, поэтому сообщения будут отправляться во все ограниченные очереди. Кроме того, мы установили значение аргумента x-dead-letter-exchange в имя нашего DLX. В то же время мы удалили аргумент x-dead-letter-routing-key.

public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
 
@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
      .build();
}
 
@Bean
FanoutExchange deadLetterExchange() {
    return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}
 
@Bean
Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}

Теперь, если мы запустим наш пример, неудачное сообщение должно быть доставлено в DLQ, но без изменения начального ключа маршрутизации:

5.4. Обработка сообщений из очереди недоставленных сообщений

Exchange: baeldung-messages-queue.dlx
Routing Key: baeldung-messages-queue

Конечно, причина, по которой мы переместили их в очередь недоставленных сообщений, заключается в том, чтобы их можно было повторно обработать в другое время.

Давайте определим прослушиватель для очереди недоставленных сообщений:

Если мы сейчас запустим наш пример кода, мы должны увидеть вывод журнала:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessages(Message message) {
    log.info("Received failed message: {}", message.toString());
}

У нас есть ошибка сообщение, но что нам делать дальше? Ответ зависит от конкретных системных требований, вида исключения или типа сообщения.

WARN 11752 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 11752 --- [ntContainer#1-1] c.b.s.e.consumer.SimpleDLQAmqpContainer  : 
  Received failed message:

Например, мы можем просто повторно поставить сообщение в очередь в исходное место назначения:

Но такая логика исключения не отличается от политики повторных попыток по умолчанию:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
    log.info("Received failed message, requeueing: {}", failedMessage.toString());
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

В обычной стратегии может потребоваться повторная обработка сообщение n раз, а затем отклонить его. Давайте реализуем эту стратегию, используя заголовки сообщений:

INFO 23476 --- [ntContainer#0-1] c.b.s.e.c.RoutingDLQAmqpContainer        :
  Received message: 
WARN 23476 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 23476 --- [ntContainer#1-1] c.b.s.e.c.RoutingDLQAmqpContainer        : 
  Received failed message, requeueing:

Сначала мы получаем значение заголовка x-retries-count, затем сравниваем это значение с максимально допустимым значением. Впоследствии, если счетчик достигает предельного количества попыток, сообщение будет отброшено:

public void processFailedMessagesRetryHeaders(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Discarding message");
        return;
    }
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

Мы должны добавить, что мы также можем использовать заголовок x-message-ttl, чтобы установить время, после которого сообщение должно быть отброшено. . Это может быть полезно для предотвращения бесконечного роста очередей.

WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Retrying message for the 1 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Retrying message for the 2 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Discarding message

5.5. Parking Lot Queue

С другой стороны, рассмотрим ситуацию, когда мы не можем просто отбросить сообщение, например, это может быть транзакция в банковском домене. Кроме того, иногда сообщение может потребовать ручной обработки или нам просто нужно записать сообщения, которые не удалось обработать более n раз.

Для подобных ситуаций существует понятие очереди на парковке. Мы можем пересылать все сообщения из DLQ, которые не прошли более допустимого количества раз, в очередь на парковку для дальнейшей обработки.

Давайте теперь реализуем эту идею:

«

public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";
 
@Bean
FanoutExchange parkingLotExchange() {
    return new FanoutExchange(EXCHANGE_PARKING_LOT);
}
 
@Bean
Queue parkingLotQueue() {
    return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
}
 
@Bean
Binding parkingLotBinding() {
    return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
}

«Во-вторых, давайте реорганизуем логику прослушивателя, чтобы отправить сообщение в очередь парковки:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Sending message to the parking lot queue");
        rabbitTemplate.send(EXCHANGE_PARKING_LOT, 
          failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
        return;
    }
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

Наконец, нам также нужно обработать сообщения, поступающие в очередь парковки:

@RabbitListener(queues = QUEUE_PARKING_LOT)
public void processParkingLotQueue(Message failedMessage) {
    log.info("Received message in parking lot queue");
    // Save to DB or send a notification.
}

Теперь мы может сохранить неудачное сообщение в базе данных или, возможно, отправить уведомление по электронной почте.

Давайте проверим эту логику, запустив наше приложение:

WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Retrying message for the 1 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Retrying message for the 2 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Sending message to the parking lot queue
INFO 14768 --- [ntContainer#2-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Received message in parking lot queue

Как видно из вывода, после нескольких неудачных попыток сообщение было отправлено в очередь на парковку.

6. Пользовательская обработка ошибок

В предыдущем разделе мы видели, как обрабатывать сбои с помощью выделенных очередей и обменов. Однако иногда нам может понадобиться отловить все ошибки, например, для регистрации или сохранения их в базе данных.

6.1. Global ErrorHandler

До сих пор мы использовали SimpleRabbitListenerContainerFactory по умолчанию, и эта фабрика по умолчанию использует ConditionalRejectingErrorHandler. Этот обработчик перехватывает различные исключения и преобразует их в одно из исключений в иерархии AmqpException.

Важно отметить, что если нам нужно обрабатывать ошибки подключения, нам нужно реализовать интерфейс ApplicationListener.

Проще говоря, ConditionalRejectingErrorHandler решает, отклонять конкретное сообщение или нет. Если сообщение, вызвавшее исключение, отклонено, оно не будет помещено в очередь повторно.

Давайте определим пользовательский ErrorHandler, который будет просто запрашивать только BusinessExceptions:

public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        if (!(t.getCause() instanceof BusinessException)) {
            throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
        }
    }
}

Кроме того, когда мы выбрасываем исключение внутри нашего метода слушателя, оно заворачивается в ListenerExecutionFailedException. Итак, нам нужно вызвать метод getCause, чтобы получить исходное исключение.

6.2. FatalExceptionStrategy

Под капотом этот обработчик использует FatalExceptionStrategy для проверки того, следует ли считать исключение фатальным. В этом случае ошибочное сообщение будет отклонено.

По умолчанию эти исключения являются фатальными:

    MessageConversionException MessageConversionException MethodArgumentNotValidException MethodArgumentTypeMismatchException NoSuchMethodException ClassCastException

Вместо реализации интерфейса ErrorHandler мы можем просто предоставить нашу FatalExceptionStrategy:

public class CustomFatalExceptionStrategy 
      extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
    @Override
    public boolean isFatal(Throwable t) {
        return !(t.getCause() instanceof BusinessException);
    }
}

Наконец, нам нужно передать нашу пользовательскую в конструктор ConditionalRejectingErrorHandler:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
  ConnectionFactory connectionFactory,
  SimpleRabbitListenerContainerFactoryConfigurer configurer) {
      SimpleRabbitListenerContainerFactory factory = 
        new SimpleRabbitListenerContainerFactory();
      configurer.configure(factory, connectionFactory);
      factory.setErrorHandler(errorHandler());
      return factory;
}
 
@Bean
public ErrorHandler errorHandler() {
    return new ConditionalRejectingErrorHandler(customExceptionStrategy());
}
 
@Bean
FatalExceptionStrategy customExceptionStrategy() {
    return new CustomFatalExceptionStrategy();
}

7. Заключение

В этом руководстве мы обсудили различные способы обработки ошибок при использовании Spring AMQP и, в частности, RabbitMQ.

Каждой системе нужна определенная стратегия обработки ошибок. Мы рассмотрели наиболее распространенные способы обработки ошибок в архитектурах, управляемых событиями. Кроме того, мы увидели, что можем комбинировать несколько стратегий для создания более комплексного и надежного решения.

Как всегда, полный исходный код статьи доступен на GitHub.