当一些流线程死亡时(例如因为异常),我不想继续,而是重新启动进程。
为了做到这一点,我需要识别这种状态。
我知道我可以使用 kafkaStream.state()
但它会检查整个kstreams的状态,也就是说如果只有一个StreamThread死了,它不会被发现。意味着如果只有一个StreamThread死了,它将不会被 kafkaStream.state()
.
我在代码中知道所有的StreamThreads都活着并且在工作的最好方法是什么?
更新 :增加超时 KafkaStreams#close()
正如马蒂亚斯所言,它可能会导致僵局。在评论中
如果你想检测是否有任何StreamThreads死亡,那么你可以使用 KafkaStreams#setUncaughtExceptionHandler()
您可以停止流媒体并退出应用程序。
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
logger.error("Exiting ", e);
kafkaStreams.close(10);
System.exit(1);//exit with error code so container can restart this app
});