如何捕获最大RestartSource后alpakka kafka源流失败

问题描述 投票:0回答:1

如何捕获最大重启次数后RestartSource的错误? 我想在源失败最大次数后做一些事情。 我可以在日志中看到源重新启动。 我尝试添加 withAttributes 但它从未被调用。

        return RestartSource.onFailuresWithBackoff(restartSettings, () -> Consumer
                .committableSource(getConsumerSettings(), topics)
                .log("error on receiver topic")
                .mapMaterializedValue(ctrl -> {
                    control = ctrl;
                    return NotUsed.getInstance();
                })
                .withAttributes(ActorAttributes.withSupervisionStrategy(e -> {
                    log.error("Stream has failed", e);
                    return (Supervision.Directive) Supervision.stop();
                })));

如有任何建议,我们将不胜感激。

apache-kafka akka akka-stream alpakka akka-kafka
1个回答
0
投票

好的,我成功了。在这种情况下,属性似乎从未被调用,但这会导致整个流失败,因此结果将在附加的流和接收器完成上。就我而言,我有类似的事情:

streamCompletion = 
    createSource()
    ...
    runWith(Sink.ignore(), actorSystem);

然后我们可以在streamCompletion上得到失败信息,如下所示:

    streamCompletion.whenComplete((done, throwable) -> {
        if (throwable != null) {
            log.error("Some error occurred {} {}", done, throwable.getMessage(), throwable);
        }
    });
© www.soinside.com 2019 - 2024. All rights reserved.