一篇文章。AWS S3与Java - Reactive该书介绍了如何使用AWS SDK 2.0客户端与Webflux。
在本篇文章中,我们将介绍如何使用AWS SDK 2.0客户端和Webflux。例子他们使用下面的处理程序上传到S3,然后返回一个HTTP Created响应。
@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers,
@RequestBody Flux<ByteBuffer> body) {
long length = headers.getContentLength();
String fileKey = UUID.randomUUID().toString();
Map<String, String> metadata = new HashMap<String, String>();
CompletableFuture future = s3client
.putObject(PutObjectRequest.builder()
.bucket(s3config.getBucket())
.contentLength(length)
.key(fileKey.toString())
.contentType(MediaType.APPLICATION_OCTET_STREAM.toString())
.metadata(metadata)
.build(),
AsyncRequestBody.fromPublisher(body));
return Mono.fromFuture(future)
.map((response) -> {
checkResult(response);
return ResponseEntity
.status(HttpStatus.CREATED)
.body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
});
}
这样就能正常工作了。试图学习WebFlux,我期望下面的内容能够在调用subscribe方法的同一个线程中异步完成HTTP上传至S3。
@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers, @RequestBody Flux<ByteBuffer> body) {
long length = headers.getContentLength();
String fileKey = UUID.randomUUID().toString();
Map<String, String> metadata = new HashMap<String, String>();
Mono<PutObjectResponse> putObjectResponseMono = Mono.fromFuture(s3client
.putObject(PutObjectRequest.builder()
.bucket(s3config.getBucket())
.contentLength(length)
.key(fileKey.toString())
.contentType(MediaType.APPLICATION_OCTET_STREAM.toString())
.metadata(metadata)
.build(),
AsyncRequestBody.fromPublisher(body)));
putObjectResponseMono
.doOnError((e) -> {
log.error("Error putting object to S3 " + Thread.currentThread().getName(), e);
})
.subscribe((response) -> {
log.info("Response from S3: " + response.toString() + "on " + Thread.currentThread().getName());
});
return Mono.just(ResponseEntity
.status(HttpStatus.CREATED)
.body(new UploadResult(HttpStatus.CREATED, new String[]{fileKey})));
}
HTTP POST按照预期完成了 但S3的Put请求失败了,出现了这条日志信息。
2020-06-10 12:31:22.275 ERROR 800 --- [tyEventLoop-0-4] c.b.aws.reactive.s3.UploadResource : Error happened on aws-java-sdk-NettyEventLoop-0-4
software.amazon.awssdk.core.exception.SdkClientException: 400 BAD_REQUEST "Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.baeldung.aws.reactive.s3.UploadResult>> com.baeldung.aws.reactive.s3.UploadResource.uploadHandler(org.springframework.http.HttpHeaders,reactor.core.publisher.Flux<java.nio.ByteBuffer>)"
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97) ~[sdk-core-2.10.27.jar:na]
at software.amazon.awssdk.core.internal.util.ThrowableUtils.asSdkException(ThrowableUtils.java:98) ~[sdk-core-2.10.27.jar:na]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:125) ~[sdk-core-2.10.27.jar:na]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:107) ~[sdk-core-2.10.27.jar:na]
........
我怀疑解释涉及到对S3的请求是在自己的线程上运行的 但我不知道是哪里出了问题,你能给我点提示吗?
理想情况下,我觉得在反应式流程中,你应该从api的输入开始,将其转换并包含在api的输出中。就像在你的代码中,你是在单独的线程中进行订阅和上传,然后返回早期响应给客户端。可能在你的情况下,由于你提前返回响应,连接被关闭,aws客户端得到的是空体,所以输入流量没有被触发。正确的方法是将客户端响应链接到aws客户端响应后再返回,你可以尝试将aws响应链接到api响应,如下所示
return Mono.just(ResponseEntity
.status(HttpStatus.CREATED)
.body(putObjectResponseMono);
当然,如果你想的话,你可以在putObjectResponseMono上使用map,并将响应转化为你想发送给客户端的形式。