RxJava中具有超时的散布/聚集

问题描述 投票:1回答:1

我正在尝试使用RxJava和AsyncHttpClient并行调用多个Web服务,然后处理每个服务的结果。我需要限制等待服务返回的时间。我遇到的限制是如何处理未及时完成的​​服务。我在AsyncHttpRequest本身和Single.Merge()中使用take()限制了网络时间,但是如何确定哪个Single不返回,因为它们在take()过期之前没有完成?这是一些代码:

public List<Single<MyResponse>> fetchS(List<MyRequest> requestlist, final AsyncHttpClient ahc) {
        List<Single<MyResponse>> ret =
            requestlist.stream().map(req -> Single.fromCompletionStage(convert(req, ahc))).collect(
                Collectors.toList());
        return ret;
    }

var responsesS = new FetcherImpl().fetchS(requests, ahc);

Single<List<MyResponse>> merged =
           Single.merge(responsesS).take(500,TimeUnit.MILLISECONDS).toList();

merged.subscribeOn(Schedulers.computation()).subscribe( list -> { process(list);}) 

基本上,我如何处理在take()的500ms时间内未完成的项目?

TIA

java rx-java rx-java2
1个回答
0
投票

这就是我的做法:

class MyResponse {
            String reponse;
            Throwable error;
        }

        List<Single<MyOriginalResponse>> singles =
                new ArrayList(Single.defer(() -> Single.just(new MyOriginalResponse()))
                        .timeout(500, TimeUnit.MILLISECONDS)
                        .map(myResponse -> new MyResponse(myResponse))
                        .onErrorReturn(throwable -> MyResponse(throwable)));

基本上,您有一个包含响应或throwable的包装器类。 https://github.com/eleventigers/rxeither之类的解决方案可以为您提供Either<Throwable, MyOriginalResponse>,然后从那里继续进行操作,具体取决于获得的内容。

希望有帮助

© www.soinside.com 2019 - 2024. All rights reserved.