Kafka StreamsUncaughtExceptionHandler REPLACE_THREAD 与 SHUTDOWN_CLIENT

问题描述 投票:0回答:1

我有一个旧主题,其中包含损坏的消息,我需要完全重新处理该主题,忽略无法处理的消息。 对于这种情况,哪种未捕获异常处理策略是正确的?

我无法完全理解 KStreams 应用程序上未捕获异常处理的 REPLACE_THREADSHUTDOWN_CLIENT 策略之间的差异。任何解释或参考都值得赞赏。

exception apache-kafka apache-kafka-streams exceptionhandler uncaughtexceptionhandler
1个回答
0
投票

由于我过去也很困惑,所以我做了一些研究,我发现的最好的东西是他们网站上的错误处理教程 confluence 中的内容。

REPLACE_THREAD - 替换接收异常的线程并 使用相同数量的已配置线程继续处理。 (注意:这可能会导致重复记录,具体取决于 应用程序的处理模式由 PROCESSING_GUARANTEE_CONFIG 值)

SHUTDOWN_CLIENT - 关闭 Kafka 的单个实例 Streams 应用程序遇到异常。 (这是之前的 行为和当前默认行为(如果您不提供) StreamsUncaughtExceptionHandler)

SHUTDOWN_APPLICATION - 关闭 Kafka Streams 的所有实例 具有相同应用程序 ID 的应用程序。 Kafka Streams 使用 重新平衡以指示所有应用程序实例关闭,因此即使 在另一台机器上运行的程序将收到信号并退出。

在此处查找更多详细信息https://developer.confluence.io/tutorials/error-handling/confluence.html。您会发现另一种模式,即 SHUTDOWN_APPLICATION。但我的建议是以不同的方式解决这个问题。

我过去也遇到过类似的数据损坏问题。我记得 JSON 主题中的 XML 消息和手动插入的测试数据不满足指定的任何架构,这导致应用程序每天多次崩溃。如果您希望 kstreams 应用程序在消耗损坏的情况下能够生存,我可以推荐以下两种解决方案,为您的 kstreams 应用程序带来更高的稳定性:

创建不良记录过滤器

在代码中初始化 kafka 流后,您将通过类似于此处示例的映射函数执行一些转换

KStream<String, String> inputStream = builder.stream("input-topic");
KStream<String, String> firstMappedStream = inputStream
  .map((key, value) -> { return <your-transformed-message>;}
);

在映射和转换之前,尝试使用专用的过滤器或映射器来执行所有安全逻辑。下一张地图将仅获得清理后的记录。所有损坏的记录将在之前的地图/过滤器中被丢弃。如果您愿意,您也可以实现日志。解决方案如下所示

KStream<String, String> inputStream = builder.stream("input-topic");
KStream<String, String> firstMappedStream = inputStream
  .filter((key, value) -> { return <your-validation-result>})
  .map((key, value) -> { return <your-transformed-message>;}
);
稳定你的转变

由于您无法想到所有可能的损坏方式,因此在映射器中额外使用异常处理非常适合构建稳定的应用程序。我的建议是将映射逻辑放在 try-catch 块中,并让映射函数为损坏的记录返回 null。这样它们就不会被传递到下一个地图函数,并且无论如何您的应用程序都将稳定运行。

希望有帮助

© www.soinside.com 2019 - 2024. All rights reserved.