我有一个像这样的流和两个接收器,但一次只使用一个:
Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink1)
要么
Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink2)
它是可配置的我们使用的接收器,但如果我并行使用两个接收器怎么办呢。我怎样才能做到这一点?
我想到了Sink.combine,但它还需要一个合并策略,我不想以任何方式结合这些接收器的结果。我并不真正关心它们,所以我只希望通过HTTP将相同的数据发送到某个端点,同时将它们发送到数据库。 Sink组合与广播非常相似,但是从头开始实现广播降低了我的代码的可读性,现在我只有简单的源,流和接收器,没有低级图形阶段。
你知道如何做到这一点的正确方法(背压和其他只使用一个水槽的东西)?
你可以使用alsoTo
(参见API docs):
Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)
使用最简单形式的GraphDSL
进行广播不应该降低可读性 - 事实上,人们甚至可能认为~>
子句在某种程度上有助于可视化流结构:
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val bcast = builder.add(Broadcast[Int](2))
Source.fromElements(1, 2, 3) ~> flow ~> bcast.in
bcast.out(0) ~> sink1
bcast.out(1) ~> sink2
ClosedShape
})
graph.run()