Akka流通过流量限制并行/处理流量的吞吐量

问题描述 投票:5回答:1

我有一个用例,我想将消息发送到外部系统,但发送此消息的流程采用并返回我不能使用下游的类型。这是传递流程的一个很好的用例。我正在使用here的实现。最初我担心如果processingFlow使用mapAsyncUnordered,那么这个流程将不起作用。由于处理流程可能会重新排序消息,因此zip可能会推出具有错误对的元组。例如,在以下示例中。

  val testSource = Source(1 until 50)
  val processingFlow: Flow[Int, Int, NotUsed] = Flow[Int].mapAsyncUnordered(10)(x => Future {
    Thread.sleep(Random.nextInt(50))
    x * 10
  })
  val passThroughFlow = PassThroughFlow(processingFlow, Keep.both)

  val future = testSource.via(passThroughFlow).runWith(Sink.seq)

我希望处理流程可以根据其输入重新排序其输出,我会得到如下结果:

[(30,1), (40,2),(10,3),(10,4), ...]

右边(传递总是按顺序)但是通过我的mapAsyncUnordered的左边可能与一个不正确的元素连接以产生一个坏元组。

相反,我实际得到:

[(10,1), (20,2),(30,3),(40,4), ...]

每次。经过进一步调查后,我注意到代码运行缓慢,实际上它并没有完全并行运行,尽管我的地图异步无序。我尝试在前后引入一个缓冲区以及一个异步边界,但它似乎总是按顺序运行。这解释了为什么它总是有序,但我希望我的处理流程具有更高的吞吐量。

我想出了以下工作:

object PassThroughFlow {

  def keepRight[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
    keepBoth[A, A1](processingFlow).map(_._2)

  def keepBoth[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder => {
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[A](2))
      val zip = builder.add(ZipWith[A1, A, (A1, A)]((left, right) => (left, right)))

      broadcast.out(0) ~> processingFlow ~> zip.in0
      broadcast.out(1) ~> zip.in1

      FlowShape(broadcast.in, zip.out)
    }
    })
}

object ParallelPassThroughFlow {


  def keepRight[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
    keepBoth(parallelism, processingFlow).map(_._2)

  def keepBoth[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] = {
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val fanOut = builder.add(Balance[A](outputPorts = parallelism))
      val merger = builder.add(Merge[(A1, A)](inputPorts = parallelism, eagerComplete = false))

      Range(0, parallelism).foreach { n =>
        val passThrough = PassThroughFlow.keepBoth(processingFlow)
        fanOut.out(n) ~> passThrough ~> merger.in(n)
      }

      FlowShape(fanOut.in, merger.out)
    })
  }

}

两个问题:

  1. original implementation中,为什么传递流中的zip限制了地图异步的并行度无序?
  2. 我的工作是健全还是可以改进?我基本上把我输入的输入传递给多个堆栈的传递流并将它们全部合并在一起。它似乎具有我想要的属性(即使处理流程重新排序,并行但仍保持顺序)但是感觉不对
scala akka akka-stream
1个回答
3
投票

您目击的行为是broadcastzip如何工作的结果:broadcast在其所有输出信号需求时向下游发射; zip在发出信号需求(并向下游发射)之前等待其所有输入。

broadcast.out(0) ~> processingFlow ~> zip.in0
broadcast.out(1) ~> zip.in1

考虑第一个元素(1)通过上图的移动。 1播放给processingFlowzipzip立即收到其中一个输入(1)并等待其他输入(10),这将需要更长的时间才能到达。只有当zip同时获得110时,才能从上游获取更多元素,从而触发第二个元素(2)通过流的移动。等等。

至于你的ParallelPassThroughFlow,我不知道为什么“对你来说感觉不对”。

© www.soinside.com 2019 - 2024. All rights reserved.