要求:批量读取来自couchbase的一堆对象。即使一个对象不可用,也会抛出一条异常,并显示一条消息,指出以下对象不可用。
方法:我们使用RxJava和asyncBucket从DB中读取这些对象。
代码:
final List<String> failedToReadOrders = new ArrayList<>();
final List<Order> list = Observable
.from(ids)
.subscribeOn(Schedulers.io())
.flatMap(id -> asyncBucket
.get(id)
.doOnError(ex-> {
failedToReadOrders.add(id);
LOGGER.error("Error occured while reading order with ID key={}",id);
})
.retryWhen(retryFunc())
.onErrorResumeNext(Observable.empty()))
.map(doc-> couchbaseConversionService.convertJsonToJavaObject(doc.content(), Order.class))
.toList()
.toBlocking()
.single();
仅当asOnError方法是asyncBucket.get()方法抛出的任何异常时,才会调用它。但是,如果该项目不可用,则asyncBucket.get()文档会说“如果找不到该文档,则Observable在没有发出项目的情况下完成。”。
问题:我当然可以通过查找要读取的ID列表和返回的对象之间的差异来了解哪些项目未被读取。但是,是否可以在上面的代码中收集这些ID,这样我就不必再次在列表上循环了?
List<Order> list = Observable
.from(ids)
.subscribeOn(Schedulers.io())
.flatMap(id -> asyncBucket
.get(id)
.switchIfEmpty(Observable.error(new Throwable("No data found")))
.doOnError(ex-> {
failedToReadOrders.add(id);
LOGGER.error("Error occured while reading order with ID key={}",id);
})
.retryWhen(retryFunc())
.onErrorResumeNext(Observable.empty()))
.map(doc-> couchbaseConversionService.convertJsonToJavaObject(doc.content(), Order.class))
.toList()
.toBlocking()
.single();