我正在尝试将传入的 Source[ByteString, Any] 广播到 2 个不同的流,然后扇入(zip)输出。但是我收到错误“重载方法〜>使用替代方案”。
val byteStringSource: Source[ByteString, Any] = Source.fromIterator(() => (1 to 10).map(i => ByteString(s"Element $i")).iterator)
val incrementer = Flow[String].map{ x =>
x
}
val multiplier = Flow[String].map{ x =>
x
}
val output = Sink.foreach[(Type1, Type2)] { n1 =>
println(s"First obj is ${(n1._1.toString)} & second obj is ${n1._2.toString}")
}
val graph = RunnableGraph.fromGraph(
GraphDSL.create() {implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[String](2))
val zip = builder.add(Zip[Type1, Type2])//fan-in operator
byteStringSource ~> broadcast
broadcast.out(0) ~> incrementer ~> zip.in0
broadcast.out(1) ~> multiplier ~> zip.in1
zip.out ~> output
ClosedShape
}
)
graph.run()
我该如何解决这个问题?
您的源的类型为
ByteString
,但广播元素的类型为 String
,因此 ~>
运算符不适用。
对于您的简单示例,您可以删除 ByteString 并仅使用纯字符串。如果您的实际情况使用更复杂的类型,您可以将原始源映射到广播将接受的类型,所以类似
val theSource = byteStringSource.map(byteString => ...)
然后在你的 GraphDSL 中使用
theSource
。