如何使用Webflux异步将一个对象放入S3?

问题描述 投票:0回答:1

一篇文章。AWS S3与Java - Reactive该书介绍了如何使用AWS SDK 2.0客户端与Webflux。

在本篇文章中,我们将介绍如何使用AWS SDK 2.0客户端和Webflux。例子他们使用下面的处理程序上传到S3,然后返回一个HTTP Created响应。

@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers,
  @RequestBody Flux<ByteBuffer> body) {

    long length = headers.getContentLength();
    String fileKey = UUID.randomUUID().toString();
    Map<String, String> metadata = new HashMap<String, String>();

    CompletableFuture future = s3client
      .putObject(PutObjectRequest.builder()
        .bucket(s3config.getBucket())
        .contentLength(length)
        .key(fileKey.toString())
        .contentType(MediaType.APPLICATION_OCTET_STREAM.toString())
        .metadata(metadata)
        .build(), 
      AsyncRequestBody.fromPublisher(body));

    return Mono.fromFuture(future)
      .map((response) -> {
        checkResult(response);
        return ResponseEntity
          .status(HttpStatus.CREATED)
          .body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
      });
}

这样就能正常工作了。试图学习WebFlux,我期望下面的内容能够在调用subscribe方法的同一个线程中异步完成HTTP上传至S3。

@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers, @RequestBody Flux<ByteBuffer> body) {

    long length = headers.getContentLength();
    String fileKey = UUID.randomUUID().toString();
    Map<String, String> metadata = new HashMap<String, String>();

    Mono<PutObjectResponse> putObjectResponseMono = Mono.fromFuture(s3client
        .putObject(PutObjectRequest.builder()
            .bucket(s3config.getBucket())
            .contentLength(length)
            .key(fileKey.toString())
            .contentType(MediaType.APPLICATION_OCTET_STREAM.toString())
            .metadata(metadata)
            .build(),
        AsyncRequestBody.fromPublisher(body)));

    putObjectResponseMono
        .doOnError((e) -> {
            log.error("Error putting object to S3 " + Thread.currentThread().getName(), e);
        })
        .subscribe((response) -> {
            log.info("Response from S3: " + response.toString() + "on " + Thread.currentThread().getName());
        });

    return Mono.just(ResponseEntity
        .status(HttpStatus.CREATED)
        .body(new UploadResult(HttpStatus.CREATED, new String[]{fileKey})));
}

HTTP POST按照预期完成了 但S3的Put请求失败了,出现了这条日志信息。

2020-06-10 12:31:22.275 ERROR 800 --- [tyEventLoop-0-4] c.b.aws.reactive.s3.UploadResource       : Error happened on aws-java-sdk-NettyEventLoop-0-4

software.amazon.awssdk.core.exception.SdkClientException: 400 BAD_REQUEST "Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.baeldung.aws.reactive.s3.UploadResult>> com.baeldung.aws.reactive.s3.UploadResource.uploadHandler(org.springframework.http.HttpHeaders,reactor.core.publisher.Flux<java.nio.ByteBuffer>)"
    at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97) ~[sdk-core-2.10.27.jar:na]
    at software.amazon.awssdk.core.internal.util.ThrowableUtils.asSdkException(ThrowableUtils.java:98) ~[sdk-core-2.10.27.jar:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:125) ~[sdk-core-2.10.27.jar:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:107) ~[sdk-core-2.10.27.jar:na]
    ........

我怀疑解释涉及到对S3的请求是在自己的线程上运行的 但我不知道是哪里出了问题,你能给我点提示吗?

java amazon-web-services amazon-s3 spring-webflux reactor
1个回答
0
投票

理想情况下,我觉得在反应式流程中,你应该从api的输入开始,将其转换并包含在api的输出中。就像在你的代码中,你是在单独的线程中进行订阅和上传,然后返回早期响应给客户端。可能在你的情况下,由于你提前返回响应,连接被关闭,aws客户端得到的是空体,所以输入流量没有被触发。正确的方法是将客户端响应链接到aws客户端响应后再返回,你可以尝试将aws响应链接到api响应,如下所示

return Mono.just(ResponseEntity
    .status(HttpStatus.CREATED)
    .body(putObjectResponseMono);

当然,如果你想的话,你可以在putObjectResponseMono上使用map,并将响应转化为你想发送给客户端的形式。

© www.soinside.com 2019 - 2024. All rights reserved.