我想依次使用Akka Stream。
源-> Sink1-> Sink2
示例:假设我们以List(1,2,3)
作为来源
元素1:将其发送到接收器1,如果sink1
成功发送,将其发送到sink2
尚不清楚您想要实现什么。
您可以使用alsoTo将元素从源发送到多个接收器。
将给定的接收器附加到此流,这意味着通过此流的元素也将发送到接收器。
val sink1 = Sink.foreach[Int](_ => "got it at sink1")
val sink2 = Sink.foreach[Int](_ => "got it at sink2")
Source(List(1, 2, 3))
.alsoTo(sink1)
.to(sink2)
元素仅在两个接收器都发出“需求”时才会发送到两个接收器。其行为与Broadcast
相同。
如果发生错误,即在流处理中引发异常,则将向两个接收器发出错误通知。说流是否需要停止,重新启动或恢复是监视策略的问题。
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))