我对使用反应流非常陌生,遇到了以下问题,这使我很难解决。目的是从MongoDB数据库中获取许多文档。对于每个文档,请从db中获取元数据,并从db中获取文件(示例代码中尚未提供)。然后,我们需要将所有数据上传到s3(将所有三个项目组合在一起)。但是,我坚持合并不同的发布者而不会弄乱元素的顺序。
Publisher<Document> p = versionCollection.find();
ConnectableFlowable<Document> version = Flowable.fromPublisher(p).publish();
Observable<GridFSFile> gridFS = version
.map(extractID())
.flatMap(loadGridFSFile()).toObservable();
Observable c = version.toObservable()
.zipWith(gridFS, (Document v, GridFSFile f) -> {
// if I check here if both messages belong together, the order sometimes is messed up
return v;
});
version.connect();
因此,基本上,我试图将事件发布到两个不同的路径,一个路径从GridFS获取元数据,然后尝试再次合并这两个路径(以便可以将初始文档与元数据一起访问)。但是,我注意到有时事件会以不同的顺序压缩(可能是因为对db的查询有时会花费不同的时间)。
每个事件的执行路径应该是这样
v
|
/ | \
v query db query db
\ | /
upload aggregate
of all 3 elements
基本上,问题在于,使用我的方法,最终得到的结果是先前查询不同元素v的早期或更晚结果。我可能需要以某种方式确保执行路径在一个输入元素的所有3条路径之间同步发生一次,但我不知道如何。
编辑
我终于找到了一种似乎可以满足需要的方法。但是,以并行方式处理并确保它们保持同步似乎太复杂了,这感觉有点奇怪。
Publisher<Document> p = versionCollection.find();
Observable<Document> version = Observable.fromPublisher(p);
version.flatMap(v -> {
ConnectableObservable<Document> connectableObservable = Observable.just(v).replay();
Observable o = connectableObservable
.map(extractAudioID())
.flatMap(loadGridFSFile(audioBucket));
Observable o3 = connectableObservable.zipWith(o, (Document a, GridFSFile f) -> {
// now everything seems to stay in order here
// and we can combine both results
});
o3.subscribe();
o.subscribe();
Disposable a = connectableObservable.connect();
return connectableObservable;
}, 1).blockingSubscribe();
static Function<ObjectId, ObservableSource<GridFSFile>> loadGridFSFile(GridFSBucket audioBucket) {
return id -> Observable.fromPublisher(audioBucket.find(new Document("_id", id)).first());
}
我对使用反应流非常陌生,遇到了以下问题,这使我很难解决。目的是从MongoDB数据库中获取许多文档。 ...
似乎可以解决问题的几件事: