通过合并 akka 流中的多个慢速源来保持排序

问题描述 投票:0回答:1

我需要合并多个慢速源但保持顺序。 如果代码执行两次,顺序必须相同。

非常简单的解决方案如下:

 Source
    .from(partions())
    .flatMapConcat(partition -> slowSource(partition))

这可行,但第二个分区的慢速源是在第一个分区之后执行的。我想并行运行慢速源,但以稳定的顺序合并结果。

我这样尝试过:

Source
    .completionStage(Source
        .from(partions())
        .map(partition -> slowSource(partition).runWith(Sink.seq(), actorSystem))
        .runWith(Sink.seq(), actorSystem))
    .mapConcat(i -> i)
    .mapAsync(partitions, stage -> stage)
    .mapConcat(i -> i) 

它正在工作,但我需要为每个分区创建一个列表。 有更好的方法来实现吗?

java akka akka-stream
1个回答
3
投票

类似的事情应该接近你所要求的:

Source.from(partitions())
  .map(partition -> (slowSource(partition).buffer(1, OverflowStrategy.backpressure())).preMaterialize(actorSystem))
  .flatMapConcat(Pair::second)

通过将源附加到缓冲区并具体化它,您可以通过初始化它并向源发出需求信号来有效地“启动泵”。如果愿意的话,您可以调整缓冲区大小(例如,可以使用

statefulMap
在以后的分区上拥有更大的缓冲区)。如果源缓慢的主要原因是第一个元素出现之前有很长的延迟,但之后它们出现得相当快,这将允许每个分区快速设置。

© www.soinside.com 2019 - 2024. All rights reserved.