我目前正在尝试在Scala中使用具有管道过滤器体系结构的Akka Streams制作程序。我有一个特定的图,它应该接受一个输入并将其输出到多个流。最后,所有不同流程的结果应合并为一个。就我而言,输入内容将是各种推文。然后,这些推文首先进入不同的过滤器,所有过滤器都位于一种类型上,然后进行扫描,该扫描仅计算已看到的某种特定类型的数量。在此之后,我希望将输出作为这些扫描的返回值并将其组合成一个元组。
现在,我为此设置了一个特定的图DSL,它使用Broadcast和ZipWith来做到这一点。我的代码如下:
val splitStreams =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val bcastTweets = builder.add(Broadcast[Tweet](4))
val zipTweets = builder.add(ZipWith[Int, Int, Int, Int, (Int, Int, Int, Int)]((a, b, c, d) => (a, b, c, d)))
bcastTweets.out(0) ~> retweetFlow ~> retweetCount ~> zipTweets.in0
bcastTweets.out(1) ~> replyFlow ~> replyCount ~> zipTweets.in1
bcastTweets.out(2) ~> quotedFlow ~> quotedCount ~> zipTweets.in2
bcastTweets.out(3) ~> normalFlow ~> normalCount ~> zipTweets.in3
FlowShape(bcastTweets.in, zipTweets.out)
})
问题是,当我测试此代码时,广播似乎都没有进入任何一个流程。
谁能告诉我我做错了什么,我已经看了大约2天了,无法解决。
所描述的问题与ZipWith
(和Zip
)有关,无法将过滤后的Shape用作其输入。我的猜测是Akka Stream不知道如何正确压缩单独过滤的Shapes的元素。显然,如果所涉及的流是使用ZipWith
的纯映射,则Zip
/ map
将起作用。
您需要的一种解决方法是将ZipWith
替换为Merge
和grouped
,如下面的琐碎示例所示,其中包含许多虚拟过滤流:
import akka.actor.ActorSystem import akka.stream.scaladsl._ import akka.stream._ implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() // Not needed for Akka Stream 2.6+ implicit val ec = system.dispatcher val n = 4 def filterFlow(i: Int) = Flow[Int].filter(_ % n == i) val customFlow = Flow.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val bcast = builder.add(Broadcast[Int](n)) val merger = builder.add(Merge[Int](n)) (0 until n).foreach{ i => bcast.out(i) ~> filterFlow(i) ~> merger.in(i) } FlowShape(bcast.in, merger.out) }) Source(0 to 9).via(customFlow).grouped(n).runForeach(println) // Output: // Vector(0, 1, 2, 3) // Vector(4, 5, 6, 7) // Vector(8, 9)
如果输出需要为元组,只需像下面那样应用
collect
(例如,对于n = 4):
val empty = -1 // Default place-holder value
Source(0 to 9).via(customFlow).grouped(n).collect{
case Vector(a) => (a, empty, empty, empty)
case Vector(a, b) => (a, b, empty, empty)
case Vector(a, b, c) => (a, b, c, empty)
case Vector(a, b, c, d) => (a, b, c, d)
}.runForeach(println)
// Output:
// (0, 1, 2, 3)
// (4, 5, 6, 7)
// (8, 9, -1, -1)