广播将不输出Akka流

问题描述 投票:0回答:1

我目前正在尝试在Scala中使用具有管道过滤器体系结构的Akka Streams制作程序。我有一个特定的图,它应该接受一个输入并将其输出到多个流。最后,所有不同流程的结果应合并为一个。就我而言,输入内容将是各种推文。然后,这些推文首先进入不同的过滤器,所有过滤器都位于一种类型上,然后进行扫描,该扫描仅计算已看到的某种特定类型的数量。在此之后,我希望将输出作为这些扫描的返回值并将其组合成一个元组。

现在,我为此设置了一个特定的图DSL,它使用Broadcast和ZipWith来做到这一点。我的代码如下:

val splitStreams =
  Flow.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val bcastTweets = builder.add(Broadcast[Tweet](4))
    val zipTweets = builder.add(ZipWith[Int, Int, Int, Int, (Int, Int, Int, Int)]((a, b, c, d) => (a, b, c, d)))

    bcastTweets.out(0) ~> retweetFlow ~> retweetCount ~> zipTweets.in0
    bcastTweets.out(1) ~> replyFlow ~> replyCount ~> zipTweets.in1
    bcastTweets.out(2) ~> quotedFlow ~> quotedCount ~> zipTweets.in2
    bcastTweets.out(3) ~> normalFlow ~> normalCount ~> zipTweets.in3

    FlowShape(bcastTweets.in, zipTweets.out)
  })

问题是,当我测试此代码时,广播似乎都没有进入任何一个流程。

谁能告诉我我做错了什么,我已经看了大约2天了,无法解决。

scala akka akka-stream
1个回答
0
投票

所描述的问题与ZipWith(和Zip)有关,无法将过滤后的Shape用作其输入。我的猜测是Akka Stream不知道如何正确压缩单独过滤的Shapes的元素。显然,如果所涉及的流是使用ZipWith的纯映射,则Zip / map将起作用。

您需要的一种解决方法是将ZipWith替换为Mergegrouped,如下面的琐碎示例所示,其中包含许多虚拟过滤流:

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream._

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()  // Not needed for Akka Stream 2.6+
implicit val ec = system.dispatcher

val n = 4

def filterFlow(i: Int) = Flow[Int].filter(_ % n == i)

val customFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val bcast = builder.add(Broadcast[Int](n))
  val merger = builder.add(Merge[Int](n))

  (0 until n).foreach{ i =>
    bcast.out(i) ~> filterFlow(i) ~> merger.in(i)
  }

  FlowShape(bcast.in, merger.out)
})

Source(0 to 9).via(customFlow).grouped(n).runForeach(println)
// Output:
// Vector(0, 1, 2, 3)
// Vector(4, 5, 6, 7)
// Vector(8, 9)

如果输出需要为元组,只需像下面那样应用collect(例如,对于n = 4):

val empty = -1  // Default place-holder value

Source(0 to 9).via(customFlow).grouped(n).collect{
    case Vector(a)          => (a, empty, empty, empty)
    case Vector(a, b)       => (a, b, empty, empty)
    case Vector(a, b, c)    => (a, b, c, empty)
    case Vector(a, b, c, d) => (a, b, c, d)
  }.runForeach(println)
// Output:
// (0, 1, 2, 3)
// (4, 5, 6, 7)
// (8, 9, -1, -1)
© www.soinside.com 2019 - 2024. All rights reserved.