我正在尝试使用RxJava和AsyncHttpClient并行调用多个Web服务,然后处理每个服务的结果。我需要限制等待服务返回的时间。我遇到的限制是如何处理未及时完成的服务。我在AsyncHttpRequest本身和Single.Merge()中使用take()限制了网络时间,但是如何确定哪个Single不返回,因为它们在take()过期之前没有完成?这是一些代码:
public List<Single<MyResponse>> fetchS(List<MyRequest> requestlist, final AsyncHttpClient ahc) {
List<Single<MyResponse>> ret =
requestlist.stream().map(req -> Single.fromCompletionStage(convert(req, ahc))).collect(
Collectors.toList());
return ret;
}
var responsesS = new FetcherImpl().fetchS(requests, ahc);
Single<List<MyResponse>> merged =
Single.merge(responsesS).take(500,TimeUnit.MILLISECONDS).toList();
merged.subscribeOn(Schedulers.computation()).subscribe( list -> { process(list);})
基本上,我如何处理在take()的500ms时间内未完成的项目?
TIA
这就是我的做法:
class MyResponse {
String reponse;
Throwable error;
}
List<Single<MyOriginalResponse>> singles =
new ArrayList(Single.defer(() -> Single.just(new MyOriginalResponse()))
.timeout(500, TimeUnit.MILLISECONDS)
.map(myResponse -> new MyResponse(myResponse))
.onErrorReturn(throwable -> MyResponse(throwable)));
基本上,您有一个包含响应或throwable的包装器类。 https://github.com/eleventigers/rxeither之类的解决方案可以为您提供Either<Throwable, MyOriginalResponse>
,然后从那里继续进行操作,具体取决于获得的内容。
希望有帮助