同一个流中有多个接收器

问题描述 投票:4回答:2

我有一个像这样的流和两个接收器,但一次只使用一个:

Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink1)

要么

Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink2)

它是可配置的我们使用的接收器,但如果我并行使用两个接收器怎么办呢。我怎样才能做到这一点?

我想到了Sink.combine,但它还需要一个合并策略,我不想以任何方式结合这些接收器的结果。我并不真正关心它们,所以我只希望通过HTTP将相同的数据发送到某个端点,同时将它们发送到数据库。 Sink组合与广播非常相似,但是从头开始实现广播降低了我的代码的可读性,现在我只有简单的源,流和接收器,没有低级图形阶段。

你知道如何做到这一点的正确方法(背压和其他只使用一个水槽的东西)?

scala akka akka-stream reactive-streams
2个回答
10
投票

你可以使用alsoTo(参见API docs):

Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)

3
投票

使用最简单形式的GraphDSL进行广播不应该降低可读性 - 事实上,人们甚至可能认为~>子句在某种程度上有助于可视化流结构:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._
  val bcast = builder.add(Broadcast[Int](2))

  Source.fromElements(1, 2, 3) ~> flow ~> bcast.in
  bcast.out(0) ~> sink1
  bcast.out(1) ~> sink2

  ClosedShape
})
graph.run()
© www.soinside.com 2019 - 2024. All rights reserved.