我想使用akka流进行跟踪。我尝试了广播,也尝试了To,但并没有解决我的问题
源-> DBSink-> APISink。
我首先想将元素发送到数据库,当事务成功完成时,我想将其发送到APISink。如果第一个接收器中的事务失败,我不想将其发送到API。
尚不清楚您想要实现什么。
您可以使用alsoTo将元素从源发送到多个接收器。
将给定的接收器附加到此流,这意味着通过此流的元素也将发送到接收器。
val sink1 = Sink.foreach[Int](_ => "got it at sink1")
val sink2 = Sink.foreach[Int](_ => "got it at sink2")
Source(List(1, 2, 3))
.alsoTo(sink1)
.to(sink2)
元素仅在两个接收器都发出“需求”时才会发送到两个接收器。其行为与Broadcast
相同。
如果发生错误,即在流处理中引发异常,则两个接收器都将收到错误通知,并且流默认情况下停止。说流是否需要停止,重新启动或恢复是监视策略的问题。
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))