«1. Обзор

В этом руководстве мы рассмотрим, как использовать Amazon SQS (Simple Queue Service) с помощью Java SDK.

2. Предварительные требования

Зависимости Maven, настройки учетной записи AWS и клиентское подключение, необходимые для использования Amazon AWS SDK для SQS, такие же, как в этой статье.

Предполагая, что мы создали экземпляр AWSCredentials, как описано в предыдущей статье, мы можем приступить к созданию нашего клиента SQS:

AmazonSQS sqs = AmazonSQSClientBuilder.standard()
  .withCredentials(new AWSStaticCredentialsProvider(credentials))
  .withRegion(Regions.US_EAST_1)
  .build();

3. Создание очередей

После того, как мы настроили наш SQS, создание очередей довольно просто.

3.1. Создание стандартной очереди

Давайте посмотрим, как мы можем создать стандартную очередь. Для этого нам нужно создать экземпляр CreateQueueRequest:

CreateQueueRequest createStandardQueueRequest = new CreateQueueRequest("baeldung-queue");
String standardQueueUrl = sqs.createQueue(createStandardQueueRequest).getQueueUrl();

3.2. Создание очереди FIFO

Создание очереди FIFO аналогично созданию стандартной очереди. Мы по-прежнему будем использовать экземпляр CreateQueueRequest, как и раньше. Только на этот раз нам нужно передать атрибуты очереди и установить для атрибута FifoQueue значение true:

Map<String, String> queueAttributes = new HashMap<>();
queueAttributes.put("FifoQueue", "true");
queueAttributes.put("ContentBasedDeduplication", "true");
CreateQueueRequest createFifoQueueRequest = new CreateQueueRequest(
  "baeldung-queue.fifo").withAttributes(queueAttributes);
String fifoQueueUrl = sqs.createQueue(createFifoQueueRequest)
  .getQueueUrl();

4. Публикация сообщений в очереди

Как только мы настроили очереди, мы можем отправка сообщений.

4.1. Отправка сообщения в стандартную очередь

Чтобы отправлять сообщения в стандартную очередь, нам нужно создать экземпляр SendMessageRequest.

Затем мы присоединяем к этому запросу карту атрибутов сообщения:

Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put("AttributeOne", new MessageAttributeValue()
  .withStringValue("This is an attribute")
  .withDataType("String"));  
    
SendMessageRequest sendMessageStandardQueue = new SendMessageRequest()
  .withQueueUrl(standardQueueUrl)
  .withMessageBody("A simple message.")
  .withDelaySeconds(30)
  .withMessageAttributes(messageAttributes);

sqs.sendMessage(sendMessageStandardQueue);

Функция withDelaySeconds() указывает, через какое время сообщение должно поступить в очередь.

4.2. Отправка сообщения в очередь FIFO

Единственная разница в этом случае заключается в том, что нам нужно указать группу, к которой принадлежит сообщение:

SendMessageRequest sendMessageFifoQueue = new SendMessageRequest()
  .withQueueUrl(fifoQueueUrl)
  .withMessageBody("Another simple message.")
  .withMessageGroupId("baeldung-group-1")
  .withMessageAttributes(messageAttributes);

Как вы можете видеть в приведенном выше примере кода, мы указываем группу, используя withMessageGroupId().

4.3. Отправка нескольких сообщений в очередь

Мы также можем отправить несколько сообщений в очередь, используя один запрос. Мы создадим список SendMessageBatchRequestEntry, который мы будем отправлять, используя экземпляр SendMessageBatchRequest:

List <SendMessageBatchRequestEntry> messageEntries = new ArrayList<>();
messageEntries.add(new SendMessageBatchRequestEntry()
  .withId("id-1")
  .withMessageBody("batch-1")
  .withMessageGroupId("baeldung-group-1"));
messageEntries.add(new SendMessageBatchRequestEntry()
  .withId("id-2")
  .withMessageBody("batch-2")
  .withMessageGroupId("baeldung-group-1"));

SendMessageBatchRequest sendMessageBatchRequest
 = new SendMessageBatchRequest(fifoQueueUrl, messageEntries);
sqs.sendMessageBatch(sendMessageBatchRequest);

5. Чтение сообщений из очередей

Мы можем получать сообщения из наших очередей, вызывая метод receiveMessage() для экземпляр ReceiveMessageRequest:

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(fifoQueueUrl)
  .withWaitTimeSeconds(10)
  .withMaxNumberOfMessages(10);

List<Message> sqsMessages = sqs.receiveMessage(receiveMessageRequest).getMessages();

Используя withMaxNumberOfMessages(), мы указываем, сколько сообщений нужно получить из очереди, хотя следует отметить, что максимальное количество сообщений равно 10.

Метод withWaitTimeSeconds() позволяет опрос. Длинный опрос — это способ ограничить количество запросов на получение сообщений, которые мы отправляем в SQS.

Проще говоря, это означает, что мы будем ждать до указанного количества секунд, чтобы получить сообщение. Если за это время в очереди нет сообщений, то запрос будет пустым. Если сообщение поступит в очередь в течение этого времени, оно будет возвращено.

Мы можем получить атрибуты и тело данного сообщения:

sqsMessages.get(0).getAttributes();
sqsMessages.get(0).getBody();

6. Удаление сообщения из очереди

Чтобы удалить сообщение, мы будем использовать DeleteMessageRequest:

sqs.deleteMessage(new DeleteMessageRequest()
  .withQueueUrl(fifoQueueUrl)
  .withReceiptHandle(sqsMessages.get(0).getReceiptHandle()));

7. Очереди недоставленных сообщений

Очередь недоставленных сообщений должна быть того же типа, что и ее базовая очередь — она должна быть FIFO, если базовая очередь FIFO, и стандартная, если базовая очередь стандартная. В этом примере мы будем использовать стандартную очередь.

Первое, что нам нужно сделать, это создать то, что станет нашей очередью недоставленных сообщений:

String deadLetterQueueUrl = sqs.createQueue("baeldung-dead-letter-queue").getQueueUrl();

Затем мы получим ARN (имя ресурса Amazon) нашей только что созданной очереди:

GetQueueAttributesResult deadLetterQueueAttributes = sqs.getQueueAttributes(
  new GetQueueAttributesRequest(deadLetterQueueUrl)
    .withAttributeNames("QueueArn"));

String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes()
  .get("QueueArn");

Наконец, мы устанавливаем эту вновь созданную очередь в качестве очереди недоставленных сообщений исходной стандартной очереди:

SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest()
  .withQueueUrl(standardQueueUrl)
  .addAttributesEntry("RedrivePolicy",
    "{\"maxReceiveCount\":\"2\", "
      + "\"deadLetterTargetArn\":\"" + deadLetterQueueARN + "\"}");

sqs.setQueueAttributes(queueAttributesRequest);

Пакет JSON, который мы установили в методе addAttributesEntry() при создании экземпляра SetQueueAttributesRequest, содержит необходимую нам информацию: maxReceiveCount равен 2. , что означает, что если сообщение получено столько раз, считается, что оно не было обработано правильно, и отправляется в нашу очередь недоставленных сообщений.

Атрибут deadLetterTargetArn указывает нашей стандартной очереди на нашу только что созданную очередь недоставленных сообщений.

8. Мониторинг

С помощью SDK мы можем проверить, сколько сообщений в данный момент находится в заданной очереди и сколько находится в обработке. Во-первых, нам нужно создать GetQueueAttributesRequest.

Оттуда мы проверим состояние очереди:

GetQueueAttributesRequest getQueueAttributesRequest 
  = new GetQueueAttributesRequest(standardQueueUrl)
    .withAttributeNames("All");
GetQueueAttributesResult getQueueAttributesResult 
  = sqs.getQueueAttributes(getQueueAttributesRequest);
System.out.println(String.format("The number of messages on the queue: %s", 
  getQueueAttributesResult.getAttributes()
    .get("ApproximateNumberOfMessages")));
System.out.println(String.format("The number of messages in flight: %s", 
  getQueueAttributesResult.getAttributes()
    .get("ApproximateNumberOfMessagesNotVisible")));

«

«Более глубокий мониторинг можно обеспечить с помощью Amazon Cloud Watch.

9. Заключение

В этой статье мы рассмотрели, как управлять очередями SQS с помощью AWS Java SDK.