将 Akka GraphDSL 与 Zip 阶段结合使用

问题描述 投票:0回答:1

考虑以下代码:

GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val in = Source(0 to 10)
  val fanOut = builder.add(Broadcast[Int](2))
  val toString = builder.add(Flow[Int].map(_.toString))
  val squared = builder.add(Flow[Int].map(x => x * x))
  val zip = builder.add(ZipLatestWith((str: String, sqr: Int) => (str, sqr)))
  val out = Sink.ignore

  in ~> fanOut ~> toString ~> zip ~> out
        fanOut ~> squared  ~> zip

  ClosedShape
}

我在

zip ~> out
中收到错误,指出 “重载方法 ~> 无法应用于 FanInShape2[String, Int, (String, Int)]”。当然,我可以通过以下方式重写该图形组合:

in ~> fanOut ~> toString ~> zip.in0; zip.out ~> out
      fanOut ~> squared  ~> zip.in1

但是我看到一些教程显示了没有

.in
.out
细节定义的分支。 DSL 关于 Zip 阶段是否有限制,或者我做错了什么?

scala akka akka-stream
1个回答
0
投票

这是 DSL 关于 zip 阶段相对于合并/连接阶段的限制。具体来说,对于 zip 阶段,因为入口可能是不同类型,所以使用 merge/concat 所采用的方法(其中要附加的第一个入口实际上是

in0
)不会保留类型安全性(Scala 中没有办法)来表达“
in
的第一个用法需要一个
Foo
,第二个需要一个
Bar
”)。虽然可以定义一个压缩两个相同类型的
Zip
运算符,但也有一种情况明确哪个入口映射到该对的哪一半对于防止构建图的顺序重新排序影响流中元素的含义(例如,如果压缩
(x, y)
坐标,则顺序的意外改变将是对角线反射)。

© www.soinside.com 2019 - 2024. All rights reserved.