如何在RxJava2中等待Maybe项目列表完成?

问题描述 投票:1回答:1

如果我对如何使用RxJava2缺乏基本的了解,请提前道歉,因为在我看来这应该是非常基础的东西。谷歌的搜索失败让我的大脑陷入困境,所以欢迎任何资源推荐。为了清楚起见,我选择使用我的变通代码的“消毒”表示。


Problem description

我有一个返回asyncCallForList()的RxJava2函数Maybe<Arraylist<CustomClass>>。此列表中的每个CustomClass对象仅填充一些基本字段(例如,源数据库仅包含唯一标识符和每个项目的标题字符串)。

每个项目所需的完整数据位于另一个数据库位置,该位置使用另一个函数asyncCallForItem(uid)检索,该函数根据唯一标识符返回Maybe<CustomClass>,其中封装的CustomClass具有所有必需的数据。将为asyncCallForList()返回的列表中的每个项调用此函数。

所需的功能是在填充列表中的所有对象后更新我的UI。


解决方法#1

很容易在连接到初始doOnSuccess()Maybe<Arraylist<CustomClass>>中循环生成的数组列表,然后在随后的异步调用返回的doOnSuccess()上的Maybe<CustomClass>中更新我的UI。这不是一个可接受的解决方法,因为将会有未知数量的UI更新(返回的初始列表可能包含任意数量的项目)并且会损害性能。


解决方法#2

这得到了预期的结果,但感觉就像是错误的方式 - 我怀疑有一个更优雅的RxJava2解决方案。基本上,我创建了一个自定义Observable,其中循环遍历列表中的项目并获取每个项目的完整数据。但是,每次填充CustomClass项目时,不是更新UI,而是增加计数器,然后检查计数器是否超过或等于初始列表大小。当满足这个条件时,我为可观察的发射器调用onComplete()方法并在那里更新UI。

private void fetchRemoteDataAndUpdateUi() {

    //Counter reset to zero before any asynchronous calls are made.
    int count = 0;

    Maybe<ArrayList<CustomClass>> itemList = asyncCallForList();
    Consumer<ArrayList<CustomClass>> onListReturnedSuccess;

    onListReturnedSuccess = new Consumer<ArrayList<CustomClass >>() {
        @Override
        public void accept(ArrayList<CustomClass> list) throws Exception {
            //Custom observable created here, in which the resulting array list is processed.
            listObservable = Observable.create(new ObservableOnSubscribe<CustomClass>() {
                @Override
                public void subscribe(final ObservableEmitter<CustomClass> e) throws Exception {
                    for (CustomClass customClass : list) {
                        final CustomClass thisCustomClass = customClass;
                        //Call to get full data on list item called here.
                        asyncCallForItem(customClass.getUid())
                                .doOnSuccess(new Consumer<CustomClass>() {
                                    @Override
                                    public void accept(CustomClass customClass) throws Exception {
                                        thisCustomClass.update(customClass);
                                        e.onNext(thisCustomClass);
                                        count++;
                                        if (count >= list.size()) {
                                            e.onComplete();
                                        }
                                    }
                                }).subscribe();
                    }
                }
            });

            listObservable
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(Schedulers.io())
                    .subscribe(new Observer<CustomClass>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                        }

                        @Override
                        public void onNext(CustomClass customClass) {
                            //Here I add the populated CustomClass object to an ArrayList field that is utilised by the UI.
                            listForUi.add(customClass);
                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onComplete() {
                            //Here the UI is finally updated once all CustomClass objects have been populated.
                            updateUi();
                        }
                    });
        }
    };


    //Kick everything off.
    itemList.doOnSuccess(onListReturnedSuccess).subscribe();
}
android rx-java2
1个回答
4
投票

flatMap吧!

asyncCallForList()
.subscribeOn(Schedulers.io())
.flatMapSingle(list ->
    Flowable.fromIterable(list)
    .flatMapMaybe(item -> 
        asyncCallForItem(item.id)
        .subscribeOn(Schedulers.io())
        .doOnSuccess(response -> {
            // copy state from the original item
            response.text = item.text;
        })
    , 1) // number of concurrent item calls
    .toList()
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(successList -> { /* update UI */ }, error -> { /* report error */ });
© www.soinside.com 2019 - 2024. All rights reserved.