GraphStage akka的重试机制

问题描述 投票:0回答:0

如果 postStop() 失败,我们如何重试/重启 graphStage?

我对 akka 比较陌生,非常感谢您的帮助。

这是我想做的:


    source
    .via(Auditing)
    .via(someTransformations)
    .async.to(sink)
**Auditing** - a class extending GraphStageWithMaterializedValue[FlowShape[T,T], Future[Unit]]
    contains:
      - inport, outport, flowShape
      - overridden method createLogicAndMaterializedValue() -> returns (GraphStageLogic, Future[Unit])
          - overriden method postStop(){ 
                //contains audit logic
                // handles failures and throws exception
            }
          - setHandler for input and output.

是否可以为类 Auditing 添加一些重试机制,以便它可以重新处理相同的数据并最终降落到接收器,而无需再次从源读取相同的数据。必须在 m 分钟内重试 n 次。

我正在考虑使用 RetryFlow.withBackOff() 之类的东西,但似乎这是针对 Flow 而不是 GraphStageLogic。此外,如果我可以在失败场景中返回相同的数据,这会起作用,但从失败阶段再次调用 sethandler(outport) 实际上并没有再次下沉。

注意:我确实有监督决策者来处理整体故障,但我需要重试此 GraphStage,而不是重新启动/恢复流。

scala akka reactive-programming akka-stream
© www.soinside.com 2019 - 2024. All rights reserved.