我需要将对象列表发送到我的远程服务器。因为它们可能很多而且很大,所以我使用flowable使用request(1)从arraylist一对一地发送它们。
对于每个对象,都会对服务器进行一次改造,作为回报,我获得了远程ID,并使用远程ID更新了本地对象。
我需要检测此任务的结束:即,发送的最后一个对象的最后一个响应,以防止对同一对象进行多次并发调用。
目前,一切正常,但是在答案从远程服务器到达之前,我得到了“完成”消息,因此在更新对象之前。
我该怎么做?
Flowable<Integer> observable = Flowable.range(0, objList.size());
observable.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
Log.d(TAG, "on start");
request(1);
}
@Override
public void onNext(Integer t) {
Log.d(TAG, "on next : " + t);
MyObj = objList.get(t);
RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {
Log.d(TAG, "recu p");
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
request(1);
return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
request(1);
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(r -> {
request(1);
Observable.empty();
})
.onExceptionResumeNext(error -> Observable.empty()) // go to next on error
.subscribeOn(Schedulers.io()).onErrorReturn(error -> {
Log.d("ERROR", error.getMessage());
return 0;
})
.onErrorResumeNext(Observable.empty()).subscribe();
}
@Override
public void onError(Throwable t) {
Log.e("XXX ERROR ", "" + t);
request(1);
patientSynchroInProgress = Boolean.FALSE;
}
@Override
public void onComplete() {
Log.e("XXX COMPLETE", "complete");
}
});
您应该在map(...)
运算符内部移动您的改造呼叫:
Flowable<Integer> observable = Flowable.range(0, objList.size());
observable
.map(t -> {
MyObj = objList.get(t);
return RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {
Log.d(TAG, "recu p");
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
request(1);
return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
request(1);
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(r -> {
request(1);
Observable.empty();
})
.onExceptionResumeNext(error -> Observable.empty()) // go to next on error
.subscribeOn(Schedulers.io()).onErrorReturn(error -> {
Log.d("ERROR", error.getMessage());
return 0;
})
.onErrorResumeNext(Observable.empty())
})
.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
Log.d(TAG, "on start");
request(1);
}
@Override
public void onNext(Integer t) {
Log.d(TAG, "on next : " + t);
}
@Override
public void onError(Throwable t) {
Log.e("XXX ERROR ", "" + t);
request(1);
patientSynchroInProgress = Boolean.FALSE;
}
@Override
public void onComplete() {
Log.e("XXX COMPLETE", "complete");
}
});
以前,您是在onNext(...)
中执行改装呼叫,因此您的网络响应可能不是顺序的。通过使用map(...)
运算符转换您的观测值,每次发射将成为一个单独的网络调用。这使您的onNext(...)
函数可以从您的改造调用中打印顺序结果。