我需要合并多个慢速源但保持顺序。 如果代码执行两次,顺序必须相同。
非常简单的解决方案如下:
Source
.from(partions())
.flatMapConcat(partition -> slowSource(partition))
这可行,但第二个分区的慢速源是在第一个分区之后执行的。我想并行运行慢速源,但以稳定的顺序合并结果。
我这样尝试过:
Source
.completionStage(Source
.from(partions())
.map(partition -> slowSource(partition).runWith(Sink.seq(), actorSystem))
.runWith(Sink.seq(), actorSystem))
.mapConcat(i -> i)
.mapAsync(partitions, stage -> stage)
.mapConcat(i -> i)
它正在工作,但我需要为每个分区创建一个列表。 有更好的方法来实现吗?
类似的事情应该接近你所要求的:
Source.from(partitions())
.map(partition -> (slowSource(partition).buffer(1, OverflowStrategy.backpressure())).preMaterialize(actorSystem))
.flatMapConcat(Pair::second)
通过将源附加到缓冲区并具体化它,您可以通过初始化它并向源发出需求信号来有效地“启动泵”。如果愿意的话,您可以调整缓冲区大小(例如,可以使用
statefulMap
在以后的分区上拥有更大的缓冲区)。如果源缓慢的主要原因是第一个元素出现之前有很长的延迟,但之后它们出现得相当快,这将允许每个分区快速设置。