RxJava2 Flowable:将对象一一发送到服务器并检测结束

问题描述 投票:0回答:1

我需要将对象列表发送到我的远程服务器。因为它们可能很多而且很大,所以我使用flowable使用request(1)从arraylist一对一地发送它们。

对于每个对象,都会对服务器进行一次改造,作为回报,我获得了远程ID,并使用远程ID更新了本地对象。

我需要检测此任务的结束:即,发送的最后一个对象的最后一个响应,以防止对同一对象进行多次并发调用。

目前,一切正常,但是在答案从远程服务器到达之前,我得到了“完成”消息,因此在更新对象之前。

我该怎么做?

Flowable<Integer> observable = Flowable.range(0, objList.size());

        observable.subscribe(new DefaultSubscriber<Integer>() {
            @Override
            public void onStart() {
                Log.d(TAG, "on start");
                request(1);
            }

            @Override
            public void onNext(Integer t) {
                Log.d(TAG, "on next : " + t);
                MyObj = objList.get(t);

                RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {

                    Log.d(TAG, "recu p");

                    if (p != null) {
                        try {
                            p.setSyncho(true);
                            // save remote id on obj
                            ObjDB.updateObj(p);
                            request(1);

                            return Observable.empty();
                        } catch (Throwable th) {
                            ExceptionHandler.logException(th);
                            return Observable.error(th);
                        }
                    } else {
                        request(1);
                        return Observable.empty();  // provisoirement si pb on renvoie vide
                    }
                })
                        .onErrorResumeNext(r -> {
                            request(1);
                            Observable.empty();
                        })
                        .onExceptionResumeNext(error -> Observable.empty()) // go to next on error
                        .subscribeOn(Schedulers.io()).onErrorReturn(error -> {
                    Log.d("ERROR", error.getMessage());
                    return 0;
                })

                        .onErrorResumeNext(Observable.empty()).subscribe();
            }


            @Override
            public void onError(Throwable t) {
                Log.e("XXX ERROR ", "" + t);
                request(1);
                patientSynchroInProgress = Boolean.FALSE;
            }

            @Override
            public void onComplete() {
                Log.e("XXX COMPLETE", "complete");
            }
        });
android rx-java2 flowable
1个回答
0
投票

您应该在map(...)运算符内部移动您的改造呼叫:

Flowable<Integer> observable = Flowable.range(0, objList.size());

observable
     .map(t -> {
            MyObj = objList.get(t);

            return RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {

                    Log.d(TAG, "recu p");

                    if (p != null) {
                        try {
                            p.setSyncho(true);
                            // save remote id on obj
                            ObjDB.updateObj(p);
                            request(1);

                            return Observable.empty();
                        } catch (Throwable th) {
                            ExceptionHandler.logException(th);
                            return Observable.error(th);
                        }
                    } else {
                        request(1);
                        return Observable.empty();  // provisoirement si pb on renvoie vide
                    }
                })
                        .onErrorResumeNext(r -> {
                            request(1);
                            Observable.empty();
                        })
                        .onExceptionResumeNext(error -> Observable.empty()) // go to next on error
                        .subscribeOn(Schedulers.io()).onErrorReturn(error -> {
                    Log.d("ERROR", error.getMessage());
                    return 0;
                })

                        .onErrorResumeNext(Observable.empty())
      })
     .subscribe(new DefaultSubscriber<Integer>() {
            @Override
            public void onStart() {
                Log.d(TAG, "on start");
                request(1);
            }

            @Override
            public void onNext(Integer t) {
                Log.d(TAG, "on next : " + t);
            }


            @Override
            public void onError(Throwable t) {
                Log.e("XXX ERROR ", "" + t);
                request(1);
                patientSynchroInProgress = Boolean.FALSE;
            }

            @Override
            public void onComplete() {
                Log.e("XXX COMPLETE", "complete");
            }
        });

以前,您是在onNext(...)中执行改装呼叫,因此您的网络响应可能不是顺序的。通过使用map(...)运算符转换您的观测值,每次发射将成为一个单独的网络调用。这使您的onNext(...)函数可以从您的改造调用中打印顺序结果。

© www.soinside.com 2019 - 2024. All rights reserved.