如果 postStop() 失败,我们如何重试/重启 graphStage?
我对 akka 比较陌生,非常感谢您的帮助。
这是我想做的:
source
.via(Auditing)
.via(someTransformations)
.async.to(sink)
**Auditing** - a class extending GraphStageWithMaterializedValue[FlowShape[T,T], Future[Unit]]
contains:
- inport, outport, flowShape
- overridden method createLogicAndMaterializedValue() -> returns (GraphStageLogic, Future[Unit])
- overriden method postStop(){
//contains audit logic
// handles failures and throws exception
}
- setHandler for input and output.
是否可以为类 Auditing 添加一些重试机制,以便它可以重新处理相同的数据并最终降落到接收器,而无需再次从源读取相同的数据。必须在 m 分钟内重试 n 次。
我正在考虑使用 RetryFlow.withBackOff() 之类的东西,但似乎这是针对 Flow 而不是 GraphStageLogic。此外,如果我可以在失败场景中返回相同的数据,这会起作用,但从失败阶段再次调用 sethandler(outport) 实际上并没有再次下沉。
注意:我确实有监督决策者来处理整体故障,但我需要重试此 GraphStage,而不是重新启动/恢复流。