«1. Цель

В предыдущей статье о загрузке S3 мы рассмотрели, как мы можем использовать общие API-интерфейсы Blob из jclouds для загрузки контента в S3. В этой статье мы будем использовать специальный асинхронный API S3 от jclouds для загрузки контента и использовать функцию многокомпонентной загрузки, предоставляемую S3.

2. Подготовка

2.1. Настройка пользовательского API

Первая часть процесса загрузки — создание API jclouds — это пользовательский API для Amazon S3:

public AWSS3AsyncClient s3AsyncClient() {
   String identity = ...
   String credentials = ...

   BlobStoreContext context = ContextBuilder.newBuilder("aws-s3").
      credentials(identity, credentials).buildView(BlobStoreContext.class);

   RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
   return providerContext.getAsyncApi();
}

2.2. Определение количества частей содержимого

Amazon S3 имеет ограничение в 5 МБ для каждой загружаемой части. Таким образом, первое, что нам нужно сделать, это определить правильное количество частей, на которые мы можем разделить наш контент, чтобы у нас не было частей меньше этого предела в 5 МБ:

public static int getMaximumNumberOfParts(byte[] byteArray) {
   int numberOfParts= byteArray.length / fiveMB; // 5*1024*1024
   if (numberOfParts== 0) {
      return 1;
   }
   return numberOfParts;
}

2.3. Разбиение содержимого на части

Мы собираемся разбить байтовый массив на заданное количество частей:

public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {
   List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
   int fullSize = byteArray.length;
   long dimensionOfPart = fullSize / maxNumberOfParts;
   for (int i = 0; i < maxNumberOfParts; i++) {
      int previousSplitPoint = (int) (dimensionOfPart * i);
      int splitPoint = (int) (dimensionOfPart * (i + 1));
      if (i == (maxNumberOfParts - 1)) {
         splitPoint = fullSize;
      }
      byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
      parts.add(partBytes);
   }

   return parts;
}

Мы собираемся проверить логику разбиения байтового массива на части — мы собираемся чтобы сгенерировать несколько байтов, разбить массив байтов, снова собрать его вместе с помощью Guava и убедиться, что мы получаем исходный код:

@Test
public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {
   byte[] byteArray = randomByteData(16);

   int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
   List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);

   assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length, 
      equalTo(byteArray.length));
   byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
   assertThat(byteArray, equalTo(unmultiplexed));
}

Чтобы сгенерировать данные, мы просто используем поддержку из Random:

byte[] randomByteData(int mb) {
   byte[] randomBytes = new byte[mb * 1024 * 1024];
   new Random().nextBytes(randomBytes);
   return randomBytes;
}

2.4. Создание полезных нагрузок

Теперь, когда мы определили правильное количество частей для нашего контента и нам удалось разбить контент на части, нам нужно сгенерировать объекты полезной нагрузки для jclouds API:

public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
   List<Payload> payloads = Lists.newArrayList();
   for (byte[] filePart : fileParts) {
      byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
      Payload partPayload = Payloads.newByteArrayPayload(filePart);
      partPayload.getContentMetadata().setContentLength((long) filePart.length);
      partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
      payloads.add(partPayload);
   }
   return payloads;
}

3. Загрузить

Процесс загрузки представляет собой гибкий многоэтапный процесс. Это означает:

    загрузку можно начать до того, как будут получены все данные — данные можно загружать по мере их поступления данные загружаются фрагментами — «Если одна из этих операций не удалась, ее можно просто извлечь, чанки можно загружать параллельно — это может значительно увеличить скорость загрузки, особенно в случае больших файлов

3.1. Инициирование операции загрузки

Первым шагом операции загрузки является запуск процесса. Этот запрос к S3 должен содержать стандартные заголовки HTTP — в частности, необходимо вычислить заголовок Content-MD5. Мы собирались использовать поддержку хэш-функции Guava здесь:

Hashing.md5().hashBytes(byteArray).asBytes();

Это хэш md5 всего массива байтов, а не его частей.

Чтобы начать загрузку и для всех дальнейших взаимодействий с S3, мы собираемся использовать AWSS3AsyncClient — асинхронный API, который мы создали ранее:

ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();

Ключ — это дескриптор, назначенный объекту — – это должен быть уникальный идентификатор, указанный клиентом.

Также обратите внимание, что, несмотря на то, что мы используем асинхронную версию API, мы блокируем результат этой операции — это потому, что нам понадобится результат инициализации, чтобы двигаться вперед. .

Результатом операции является идентификатор загрузки, возвращаемый S3. Он будет идентифицировать загрузку на протяжении всего ее жизненного цикла и будет присутствовать во всех последующих операциях загрузки.

3.2. Загрузка частей

Следующим шагом будет загрузка частей. Наша цель здесь — отправлять эти запросы параллельно, так как операция загрузки частей представляет собой основную часть процесса загрузки:

List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
   ListenableFuture<String> future = s3AsyncApi.uploadPart(
      container, key, partNumber + 1, uploadId, payloads.get(partNumber));
   ongoingOperations.add(future);
}

Номера частей должны быть непрерывными, но порядок, в котором отправляются запросы, не имеет значения.

После того, как все запросы на загрузку отправлены, нам нужно дождаться их ответов, чтобы мы могли собрать индивидуальное значение ETag для каждой части:

Function<ListenableFuture<String>, String> getEtagFromOp = 
  new Function<ListenableFuture<String>, String>() {
   public String apply(ListenableFuture<String> ongoingOperation) {
      try {
         return ongoingOperation.get();
      } catch (InterruptedException | ExecutionException e) {
         throw new IllegalStateException(e);
      }
   }
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);

Если по какой-либо причине один из запросов на загрузку части операций не удается, операцию можно повторять до тех пор, пока она не завершится успешно. Приведенная выше логика не содержит механизма повторных попыток, но его создание должно быть достаточно простым.

3.3. Завершение операции загрузки

Заключительный этап процесса загрузки — завершение многокомпонентной операции. API S3 требует загрузки ответов из предыдущих частей в виде карты, которую теперь мы можем легко создать из списка ETag, полученного выше:

Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
   parts.put(i + 1, etagsOfParts.get(i));
}

И, наконец, отправьте полный запрос:

s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();

~~ ~ Это вернет окончательный ETag готового объекта и завершит весь процесс загрузки.

4. Заключение

«В этой статье мы создали многокомпонентную, полностью параллельную операцию загрузки в S3, используя пользовательский API jclouds S3. Эта операция готова к использованию как есть, но ее можно улучшить несколькими способами.

Во-первых, следует добавить логику повторных попыток для операций загрузки, чтобы лучше справляться с ошибками.

Далее, для действительно больших файлов, даже несмотря на то, что механизм отправляет все составные запросы на загрузку параллельно, механизм регулирования должен по-прежнему ограничивать количество отправляемых параллельных запросов. Это нужно как для того, чтобы пропускная способность не стала узким местом, так и для того, чтобы убедиться, что сам Amazon не помечает процесс загрузки как превышение разрешенного лимита запросов в секунду — Guava RateLimiter потенциально может очень хорошо подходить для этого.