«1. Введение

AWS предлагает множество сервисов через многочисленные API, к которым мы можем получить доступ из Java, используя их официальный SDK. Однако до недавнего времени этот SDK не предлагал поддержку реактивных операций и имел лишь ограниченную поддержку асинхронного доступа.

С выпуском AWS SDK для Java 2.0 мы теперь можем использовать эти API в полностью неблокирующем режиме ввода-вывода благодаря принятию стандарта Reactive Streams.

В этом руководстве мы рассмотрим эти новые функции, реализуя простой REST API хранилища больших двоичных объектов в Spring Boot, который использует известную службу S3 в качестве серверной части хранилища.

2. Обзор операций AWS S3

Прежде чем углубляться в реализацию, давайте сделаем краткий обзор того, чего мы хотим здесь достичь. Типичная служба хранилища BLOB-объектов предоставляет операции CRUD, которые использует интерфейсное приложение, чтобы позволить конечному пользователю загружать, перечислять, скачивать и удалять несколько типов содержимого, например аудио, изображения и документы.

Распространенная проблема, с которой приходится сталкиваться традиционным реализациям, заключается в том, как эффективно обрабатывать большие файлы или медленные соединения. В ранних версиях (до сервлета 3.0) все, что могла предложить спецификация JavaEE, — это блокирующий API, поэтому нам требовался поток для каждого параллельного клиента хранилища BLOB-объектов. У этой модели есть недостаток, который требует больше серверных ресурсов (следовательно, больших машин) и делает их более уязвимыми для атак типа DoS:

Используя реактивный стек, мы можем сделать наш сервис гораздо менее ресурсоемким для того же количества клиентов. В реализации реактора используется небольшое количество потоков, которые отправляются в ответ на события завершения ввода-вывода, такие как доступность новых данных для чтения или завершение предыдущей записи.

Это означает, что один и тот же поток продолжает обрабатывать эти события, которые могут исходить от любого из активных клиентских подключений, до тех пор, пока не останется доступной работы. Этот подход значительно сокращает количество переключений контекста — довольно дорогостоящую операцию — и позволяет очень эффективно использовать доступные ресурсы:

3. Настройка проекта

Наш демонстрационный проект представляет собой стандартное приложение Spring Boot WebFlux. который включает в себя обычные зависимости поддержки, такие как Lombok и JUnit.

В дополнение к этим библиотекам нам нужно добавить AWS SDK для зависимостей Java V2:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>bom</artifactId>
            <version>2.10.1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>s3</artifactId>
        <scope>compile</scope>
    </dependency>

    <dependency>
        <artifactId>netty-nio-client</artifactId>
        <groupId>software.amazon.awssdk</groupId>
        <scope>compile</scope>
    </dependency>
</dependencies>

AWS SDK предоставляет спецификацию, определяющую требуемые версии для всех зависимостей, поэтому нам не нужно указывать их в разделе зависимостей нашего файла POM.

Мы добавили клиентскую библиотеку S3, которая будет включать в себя другие основные зависимости от SDK. Нам также понадобится клиентская библиотека Netty, поскольку мы будем использовать асинхронные API для взаимодействия с AWS.

Официальная документация AWS содержит более подробную информацию о доступных транспортах.

4. Создание клиента AWS S3

Точкой входа для операций S3 является класс S3AsyncClient, который мы будем использовать для запуска новых вызовов API.

Поскольку нам нужен только один экземпляр этого класса, давайте создадим класс @Configuration с методом @Bean, который его создаст, чтобы мы могли внедрить его туда, куда нам нужно:

@Configuration
@EnableConfigurationProperties(S3ClientConfigurarionProperties.class)
public class S3ClientConfiguration {
    @Bean
    public S3AsyncClient s3client(S3ClientConfigurarionProperties s3props, 
      AwsCredentialsProvider credentialsProvider) {
        SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
          .writeTimeout(Duration.ZERO)
          .maxConcurrency(64)
          .build();
        S3Configuration serviceConfiguration = S3Configuration.builder()
          .checksumValidationEnabled(false)
          .chunkedEncodingEnabled(true)
          .build();
        S3AsyncClientBuilder b = S3AsyncClient.builder().httpClient(httpClient)
          .region(s3props.getRegion())
          .credentialsProvider(credentialsProvider)
          .serviceConfiguration(serviceConfiguration);

        if (s3props.getEndpoint() != null) {
            b = b.endpointOverride(s3props.getEndpoint());
        }
        return b.build();
    }
}

Для этой демонстрации мы вы используете минимальный класс @ConfigurationProperties (доступный в нашем репозитории), который содержит следующую информацию, необходимую для доступа к сервисам S3:

    регион: допустимый идентификатор региона AWS, например, us-east-1 accessKeyId/secretAccessKey: наш Конечная точка ключа и идентификатора API AWS: необязательный URI, который мы можем использовать для переопределения конечной точки службы S3 по умолчанию. Основной вариант использования — использование демо-кода с альтернативными решениями для хранения данных, которые предлагают S3-совместимый API (примерами являются minio и localstack) Bucket: Имя корзины, в которой мы будем хранить загруженные файлы

Есть несколько важных моментов упоминание о коде инициализации клиента. Во-первых, мы отключаем тайм-ауты записи и увеличиваем максимальный параллелизм, чтобы загрузка могла выполняться даже в условиях низкой пропускной способности.

«Во-вторых, мы отключаем проверку контрольной суммы и включаем кодирование по частям. Мы делаем это, потому что хотим начать загрузку данных в корзину, как только данные поступят в нашу службу в потоковом режиме.

Наконец, мы не рассматриваем само создание корзины, так как предполагаем, что она уже создана и настроена администратором.

Что касается учетных данных, мы предоставляем настроенный AwsCredentialsProvider, который может восстанавливать учетные данные из свойств Spring. Это открывает возможность вводить эти значения через абстракцию Spring Environment и все ее поддерживаемые реализации PropertySource, такие как Vault или Config Server:

@Bean
public AwsCredentialsProvider awsCredentialsProvider(S3ClientConfigurarionProperties s3props) {
    if (StringUtils.isBlank(s3props.getAccessKeyId())) {
        return DefaultCredentialsProvider.create();
    } else {
        return () -> {
            return AwsBasicCredentials.create(
              s3props.getAccessKeyId(),
              s3props.getSecretAccessKey());
        };
    }
}

5. Обзор службы загрузки

Теперь мы реализуем службу загрузки, которая мы будем доступны по пути /inbox.

POST для этого пути к ресурсу сохранит файл в нашей корзине S3 под случайно сгенерированным ключом. Мы сохраним исходное имя файла в качестве ключа метаданных, чтобы мы могли использовать его для создания соответствующих заголовков загрузки HTTP для браузеров.

Нам нужно обработать два различных сценария: простую загрузку и загрузку из нескольких частей. Давайте продолжим и создадим @RestController и добавим методы для обработки этих сценариев:

@RestController
@RequestMapping("/inbox")
@Slf4j
public class UploadResource {
    private final S3AsyncClient s3client;
    private final S3ClientConfigurarionProperties s3config;

    public UploadResource(S3AsyncClient s3client, S3ClientConfigurarionProperties s3config) {
        this.s3client = s3client;
        this.s3config = s3config;        
    }
    
    @PostMapping
    public Mono<ResponseEntity<UploadResult>> uploadHandler(
      @RequestHeader HttpHeaders headers, 
      @RequestBody Flux<ByteBuffer> body) {
      // ... see section 6
    }

    @RequestMapping(
      consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
      method = {RequestMethod.POST, RequestMethod.PUT})
    public Mono<ResponseEntity<UploadResult>> multipartUploadHandler(
      @RequestHeader HttpHeaders headers,
      @RequestBody Flux<Part> parts ) {
      // ... see section 7
    }
}

Сигнатуры обработчиков отражают основное различие между обоими случаями: в простом случае тело содержит само содержимое файла, тогда как в многокомпонентном случае он может иметь несколько «частей», каждая из которых соответствует файлу или данным формы.

Для удобства мы будем поддерживать многокомпонентную загрузку с использованием методов POST или PUT. Причина этого в том, что некоторые инструменты (в частности, cURL) используют последний по умолчанию при загрузке файлов с параметром -F.

В обоих случаях мы вернем UploadResult, содержащий результат операции и сгенерированные файловые ключи, которые клиент должен использовать для восстановления исходных файлов — подробнее об этом позже!

6. Загрузка одного файла

В этом случае клиенты отправляют контент в простой операции POST с телом запроса, содержащим необработанные данные. Чтобы получить этот контент в реактивном веб-приложении, все, что нам нужно сделать, — это объявить метод @PostMapping, который принимает аргумент Flux\u003cByteBuffer\u003e.

Потоковая передача этого потока в новый файл S3 в этом случае проста.

Все, что нам нужно, это создать PutObjectRequest со сгенерированным ключом, длиной файла, типом содержимого MIME и передать его методу putObject() в нашем клиенте S3:

@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers,
  @RequestBody Flux<ByteBuffer> body) {
    // ... some validation code omitted
    String fileKey = UUID.randomUUID().toString();
    MediaType mediaType = headers.getContentType();

    if (mediaType == null) {
        mediaType = MediaType.APPLICATION_OCTET_STREAM;
    }
    CompletableFuture future = s3client
      .putObject(PutObjectRequest.builder()
        .bucket(s3config.getBucket())
        .contentLength(length)
        .key(fileKey.toString())
        .contentType(mediaType.toString())
        .metadata(metadata)
        .build(), 
      AsyncRequestBody.fromPublisher(body));

    return Mono.fromFuture(future)
      .map((response) -> {
        checkResult(response);
        return ResponseEntity
          .status(HttpStatus.CREATED)
          .body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
        });
}

Ключевым моментом здесь является то, как мы передаем входящий Flux методу putObject().

Этот метод ожидает объект AsyncRequestBody, предоставляющий содержимое по запросу. По сути, это обычный издатель с некоторыми дополнительными удобными методами. В нашем случае мы воспользуемся преимуществами метода fromPublisher() для преобразования нашего Flux в требуемый тип.

Кроме того, мы предполагаем, что клиент отправит HTTP-заголовок Content-Length с правильным значением. Без этой информации вызов не удастся, так как это обязательное поле.

Асинхронные методы в SDK V2 всегда возвращают объект CompletableFuture. Мы берем его и адаптируем к Mono, используя фабричный метод fromFuture(). Это сопоставляется с конечным объектом UploadResult.

7. Загрузка нескольких файлов

Обработка загрузки multipart/form-data может показаться простой, особенно при использовании библиотек, которые обрабатывают все детали за нас. Итак, можем ли мы просто использовать предыдущий метод для каждого загруженного файла? Ну да, но за это приходится платить: буферизация.

Чтобы использовать предыдущий метод, нам нужна длина части, но передача файла по частям не всегда включает эту информацию. Один из подходов — сохранить часть во временном файле, а затем отправить его в AWS, но это замедлит общее время загрузки. Это также означает дополнительное хранилище для наших серверов.

В качестве альтернативы здесь мы будем использовать многокомпонентную загрузку AWS. Эта функция позволяет разделить загрузку одного файла на несколько фрагментов, которые мы можем отправлять параллельно и не по порядку.

Шаги следующие, нам нужно отправить:

    «запрос createMultipartUpload — AWS отвечает идентификатором uploadId, который мы будем использовать в следующих вызовах. Фрагменты файлов, содержащие идентификатор uploadId, порядковый номер и содержимое. Получены ETags

Обратите внимание: мы будем повторять эти шаги для каждой полученной FilePart!

7.1. Конвейер верхнего уровня

Обработчик multipartUploadHandler в нашем классе @Controller отвечает за обработку, что неудивительно, многокомпонентных загрузок файлов. В этом контексте каждая часть может иметь данные любого типа, идентифицируемые своим MIME-типом. Фреймворк Reactive Web доставляет эти части нашему обработчику в виде потока объектов, которые реализуют интерфейс Part, которые мы будем обрабатывать по очереди:

return parts
  .ofType(FilePart.class)
  .flatMap((part)-> saveFile(headers, part))
  .collect(Collectors.toList())
  .map((keys)-> new UploadResult(HttpStatus.CREATED, keys)));

Этот конвейер начинается с фильтрации частей, которые соответствуют фактическому загруженному файлу, который всегда будет объектом, реализующим интерфейс FilePart. Затем каждая часть передается методу saveFile, который обрабатывает фактическую загрузку одного файла и возвращает сгенерированный ключ файла.

Собираем все ключи в список и, наконец, строим окончательный UploadResult. Мы всегда создаем новый ресурс, поэтому мы вернем более описательный статус CREATED (202) вместо обычного OK.

7.2. Обработка загрузки одного файла

Мы уже описали шаги, необходимые для загрузки файла с использованием многокомпонентного метода AWS. Однако есть одна загвоздка: служба S3 требует, чтобы каждая часть, кроме последней, имела заданный минимальный размер — в настоящее время 5 МБ.

Это означает, что мы не можем просто взять полученные фрагменты и сразу отправить их. Вместо этого нам нужно буферизовать их локально, пока мы не достигнем минимального размера или конца данных. Так как нам также нужно место для отслеживания количества отправленных частей и итоговых результатов CompletedPart, мы создадим простой внутренний класс UploadState для хранения этого состояния:

class UploadState {
    String bucket;
    String filekey;
    String uploadId;
    int partCounter;
    Map<Integer, CompletedPart> completedParts = new HashMap<>();
    int buffered = 0;
    // ... getters/setters omitted
    UploadState(String bucket, String filekey) {
        this.bucket = bucket;
        this.filekey = filekey;
    }
}

Учитывая необходимые шаги и буферизацию, мы заканчиваем С реализацией может показаться немного пугающим на первый взгляд:

Mono<String> saveFile(HttpHeaders headers,String bucket, FilePart part) {
    String filekey = UUID.randomUUID().toString();
    Map<String, String> metadata = new HashMap<String, String>();
    String filename = part.filename();
    if ( filename == null ) {
        filename = filekey;
    }       
    metadata.put("filename", filename);    
    MediaType mt = part.headers().getContentType();
    if ( mt == null ) {
        mt = MediaType.APPLICATION_OCTET_STREAM;
    }
    UploadState uploadState = new UploadState(bucket,filekey);     
    CompletableFuture<CreateMultipartUploadResponse> uploadRequest = s3client
      .createMultipartUpload(CreateMultipartUploadRequest.builder()
        .contentType(mt.toString())
        .key(filekey)
        .metadata(metadata)
        .bucket(bucket)
        .build());

    return Mono
      .fromFuture(uploadRequest)
      .flatMapMany((response) -> {
          checkResult(response);              
          uploadState.uploadId = response.uploadId();
          return part.content();
      })
      .bufferUntil((buffer) -> {
          uploadState.buffered += buffer.readableByteCount();
          if ( uploadState.buffered >= s3config.getMultipartMinPartSize() ) {
              uploadState.buffered = 0;
              return true;
          } else {
              return false;
          }
      })
      .map((buffers) -> concatBuffers(buffers))
      .flatMap((buffer) -> uploadPart(uploadState,buffer))
      .reduce(uploadState,(state,completedPart) -> {
          state.completedParts.put(completedPart.partNumber(), completedPart);              
          return state;
      })
      .flatMap((state) -> completeUpload(state))
      .map((response) -> {
          checkResult(response);
          return  uploadState.filekey;
      });
}

Мы начнем со сбора метаданных файла и их использования для создания объекта запроса, необходимого вызовом API createMultipartUpload(). Этот вызов возвращает CompletableFuture, который является отправной точкой для нашего конвейера потоковой передачи.

Давайте рассмотрим, что делает каждый шаг этого конвейера:

    После получения начального результата, который содержит сгенерированный S3 uploadId, мы сохраняем его в объекте состояния загрузки и начинаем потоковую передачу тела файла. Обратите внимание на использование здесь flatMapMany, который превращает Mono в Flux. Мы используем bufferUntil() для накопления необходимого количества байтов. Конвейер в этот момент изменяется с потока объектов DataBuffer на поток объектов List\u003cDataBuffer\u003e. Преобразование каждого List\u003cDataBuffer\u003e в ByteBuffer. Отправка ByteBuffer на S3 (см. Значения CompletedPart в uploadState Сигналы S3 о том, что мы завершили загрузку (подробнее об этом позже) Вернуть сгенерированный ключ файла

7.3. Загрузка частей файла

Еще раз поясним, что «часть файла» здесь означает часть одного файла (например, первые 5 МБ из 100 МБ файла), а не часть сообщения, файл, как в потоке верхнего уровня!

Конвейер загрузки файлов вызывает метод uploadPart() с двумя аргументами: состояние загрузки и ByteBuffer. Оттуда мы создаем экземпляр UploadPartRequest и используем метод uploadPart(), доступный в нашем S3AsyncClient, для отправки данных:

private Mono<CompletedPart> uploadPart(UploadState uploadState, ByteBuffer buffer) {
    final int partNumber = ++uploadState.partCounter;
    CompletableFuture<UploadPartResponse> request = s3client.uploadPart(UploadPartRequest.builder()
        .bucket(uploadState.bucket)
        .key(uploadState.filekey)
        .partNumber(partNumber)
        .uploadId(uploadState.uploadId)
        .contentLength((long) buffer.capacity())
        .build(), 
        AsyncRequestBody.fromPublisher(Mono.just(buffer)));
    
    return Mono
      .fromFuture(request)
      .map((uploadPartResult) -> {              
          checkResult(uploadPartResult);
          return CompletedPart.builder()
            .eTag(uploadPartResult.eTag())
            .partNumber(partNumber)
            .build();
      });
}

Здесь мы используем возвращаемое значение из запроса uploadPart() для создания экземпляра CompletedPart. Это тип AWS SDK, который нам понадобится позже при создании окончательного запроса, закрывающего загрузку.

7.4. Завершение загрузки

И последнее, но не менее важное: нам нужно завершить загрузку файла, состоящего из нескольких частей, отправив запрос completeMultipartUpload() на S3. Это довольно просто, учитывая, что конвейер загрузки передает всю необходимую нам информацию в качестве аргументов:

private Mono<CompleteMultipartUploadResponse> completeUpload(UploadState state) {        
    CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder()
        .parts(state.completedParts.values())
        .build();
    return Mono.fromFuture(s3client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
        .bucket(state.bucket)
        .uploadId(state.uploadId)
        .multipartUpload(multipartUpload)
        .key(state.filekey)
        .build()));
}

8. Загрузка файлов с AWS

«По сравнению с загрузкой из нескольких частей загрузка объектов из корзины S3 — гораздо более простая задача. В этом случае нам не нужно беспокоиться о кусках или чем-то подобном. API SDK предоставляет метод getObject(), который принимает два аргумента:

    Объект GetObjectRequest, содержащий запрошенную корзину и ключ файла. реализаций последних, которые позволяют адаптировать поток к Flux, но, опять же, за свою цену: они буферизуют данные внутри буфера массива. Поскольку такая буферизация приводит к снижению времени отклика клиента нашей демонстрационной службы, мы реализуем собственный адаптер, что, как мы увидим, не имеет большого значения.

8.1. Контроллер загрузки

Наш контроллер загрузки — это стандартный Spring Reactive @RestController с единственным методом @GetMapping, который обрабатывает запросы на загрузку. Мы ожидаем ключ файла через аргумент @PathVariable и вернем асинхронный ResponseEntity с содержимым файла:

Здесь getMetadataItem() — это просто вспомогательный метод, который ищет данный ключ метаданных в ответе в нечувствительный к регистру способ.

@GetMapping(path="/{filekey}")
Mono<ResponseEntity<Flux<ByteBuffer>>> downloadFile(@PathVariable("filekey") String filekey) {    
    GetObjectRequest request = GetObjectRequest.builder()
      .bucket(s3config.getBucket())
      .key(filekey)
      .build();
    
    return Mono.fromFuture(s3client.getObject(request,new FluxResponseProvider()))
      .map(response -> {
        checkResult(response.sdkResponse);
        String filename = getMetadataItem(response.sdkResponse,"filename",filekey);            
        return ResponseEntity.ok()
          .header(HttpHeaders.CONTENT_TYPE, response.sdkResponse.contentType())
          .header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.sdkResponse.contentLength()))
          .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
          .body(response.flux);
      });
}

Это важная деталь: S3 возвращает метаданные, используя специальные заголовки HTTP, но эти заголовки нечувствительны к регистру (см. RFC 7230, раздел 3.2). Это означает, что реализации могут изменить регистр для данного элемента по желанию — и это действительно происходит при использовании MinIO.

8.2. Реализация FluxResponseProvider

Наш FluxReponseProvider должен реализовать интерфейс AsyncResponseTransformer, который имеет только четыре метода:

prepare(), где мы можем выполнить любую необходимую настройку onResponse(), вызываемую, когда S3 возвращает статус ответа, и метаданные onStream(), вызываемой, когда ответ имеет тело, всегда после onResponse(), exceptionOccurred() вызывается в случае какой-либо ошибки. stream:

    Наконец, давайте кратко рассмотрим класс FluxResponse:

9. Заключение

class FluxResponseProvider implements AsyncResponseTransformer<GetObjectResponse,FluxResponse> {    
    private FluxResponse response;
    @Override
    public CompletableFuture<FluxResponse> prepare() {
        response = new FluxResponse();
        return response.cf;
    }

    @Override
    public void onResponse(GetObjectResponse sdkResponse) {            
        this.response.sdkResponse = sdkResponse;
    }

    @Override
    public void onStream(SdkPublisher<ByteBuffer> publisher) {
        response.flux = Flux.from(publisher);
        response.cf.complete(response);            
    }

    @Override
    public void exceptionOccurred(Throwable error) {
        response.cf.completeExceptionally(error);
    }
}

В этом руководстве мы рассмотрели основы использования реактивных расширений, доступных в Библиотека AWS SDK версии 2. Здесь мы сосредоточились на сервисе AWS S3, но мы можем распространить те же методы на другие реактивные сервисы, такие как DynamoDB.

class FluxResponse {
    final CompletableFuture<FluxResponse> cf = new CompletableFuture<>();
    GetObjectResponse sdkResponse;
    Flux<ByteBuffer> flux;
}

Как обычно, весь код доступен на GitHub.

«

As usual, all code is available over on GitHub.