我不想在不丢失发生故障时发送的数据的情况下跳过流程。但我找不到办法做到这一点。这是我用来测试的示例代码。
val decider: Supervision.Decider = {
case exception: Exception =>
// println(exception.getMessage)
Supervision.restart
}
Source(1 to 10)
.via(Flow[Int].map(x => {
if (x == 5) throw new Exception("boom!")
x
}))
.recover[Int] {
case e: Exception => 0
}
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.foreach(println))
获得的输出: 1 2 3 4 6 7 8 9 10
需要输出: 1 2 3 4 5 6 7 8 9 10
有没有办法在流程抛出异常时保留消息?或者其他可以帮助我解决这个问题的解决方案?
这是监督策略不可能实现的。其理念是,如果元素能够恢复,操作员就应该恢复它。只有在出现不可恢复的故障的情况下才会调用监督策略。
引用监督策略文档(对于 Akka Actors,不是 Akka Streams,但流继承了这些概念):
如果在处理消息时抛出异常(即从其邮箱中取出并移交给当前行为),则该消息将丢失。重要的是要了解它不会放回邮箱。因此,如果您想重试处理消息,您需要通过捕获异常并重试流程来自行处理。确保对重试次数进行限制,因为您不希望系统活锁(因此消耗大量 CPU 周期而不取得进展)。
毕竟,如果操作员第一次没有处理该元素,那么第二次会有什么不同呢? (因此出现了关于活锁的警告。)
因此,如果元素可以通过某种方式恢复,则使用 try/catch。监督和流错误处理更多的是关于如何决定在元素失败的情况下如何处理流的其余部分。