因此,我是Reactive编程的新手,我编写了一些要测试的代码。这些是更多的集成测试,因为我正在实时复制文件,以后再检查它们是否相同。我有一个MockWebServer
模拟响应为4xx
,在代码中处理得很好。不幸的是,我也得到了io.netty.handler.timeout.ReadTimeoutException
,它掩盖了我的自定义WebClientResponseException
,因此在测试中我得到了错误的异常。基本上我有两个问题,到底为什么我会收到io.netty.handler.timeout.ReadTimeoutException
异常?由于某种原因,它仅在doOnError()
方法之后出现,但我不确定为什么会这样。
现在,代码已经实现,并且正在同步,我很清楚这一点。
第二个问题是,在重试一定次数后,如何在测试中处理自定义异常?现在是3,只有那时我希望抛出其他异常。
这里是代码:
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);
Flux<DataBuffer> fileDataStream = Mono.just(filePath)
.map(file -> targetPath.toFile().exists() ? targetPath.toFile().length() : 0)
.map(bytes -> webClient
.get()
.uri(uri)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.header("Range", String.format("bytes=%d-", bytes))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
.onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
.bodyToFlux(DataBuffer.class)
.doOnError(throwable -> log.info("fileDataStream onError", throwable))
)
.flatMapMany(Function.identity());
return DataBufferUtils
.write(fileDataStream, fileChannel)
.map(DataBufferUtils::release)
.doOnError(throwable -> {
try {
fileChannel.force(true);
} catch (IOException e) {
throw new WritingException("failed force update to file channel", e);
}
})
.retry(3)
.doOnComplete(() -> {
try {
fileChannel.force(true);
} catch (IOException e) {
log.warn("failed force update to file channel", e);
throw new WritingException("failed force update to file channel", e);
}
})
.doOnError(throwable -> targetPath.toFile().delete())
.then(Mono.just(target));
响应为Mono<Path>
,因为我只对新创建和复制的文件的Path
感兴趣。
欢迎对代码发表任何评论。
基于该线程Downlolad and save file from ClientRequest using ExchangeFunction in Project Reactor建立了复制机制
所以基本上问题出在测试中。我只有一次MockResponse
排队到MockWebServer
,所以在重试WebClient
模拟服务器时没有设置任何响应(基本上它的行为就像根本不可用,因为没有模拟响应)。 >
为了能够在服务器完全关闭的情况下处理异常,我认为值得在通量链中添加类似以下内容的行:
.doOnError(ChannelException.class, e -> { throw new YourCustomExceptionForHandlingServerIsDownSituation("Server is unreachable", e); })
这将帮助您从Netty处理
ReadTimeoutException
(如果无法访问服务器),因为它扩展了ChannelException
类。始终处理您的异常。