如果kafka经纪人宕机,我使用下面的方法发送邮件警报,但如果kafka经纪人宕机,它就不起作用,我没有收到任何警报。如果 make ==> if(controllerId != null) 意味着当代理启动时,它是 true 并收到邮件警报。但我只想在 if(controllerId == null) 时收到邮件警报:
public void checkKafkaBrokerStatus() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Update with your Kafka bootstrap servers
try (AdminClient adminClient = KafkaAdminClient.create(props)) {
DescribeClusterResult describeClusterResult = adminClient.describeCluster();
KafkaFuture<Node> controller = describeClusterResult.controller();
Node controllerId = controller.get(); // Get the current controller ID
// If controllerId is null, it indicates Kafka is not available or broker is down
if (controllerId == null) {
// Send email alert
System.out.println("Kafka broker is down. Sending email alert...");
mailService.sendMailAlert();
}
} catch (Exception e) {
// Handle exception
e.printStackTrace();
}
}
我尝试在之间切换
if (controllerId == null)==> this won't work and if (controllerId != null)==> (sends mail alert)
如果我没记错的话,我相信当broker宕机时,Kafka会抛出一个期望而不是返回null。
将代码修改为以下并再次测试
public void checkKafkaBrokerStatus() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Update with your Kafka bootstrap servers
try (AdminClient adminClient = KafkaAdminClient.create(props)) {
DescribeClusterResult describeClusterResult = adminClient.describeCluster();
KafkaFuture<Node> controller = describeClusterResult.controller();
Node controllerId = controller.get(); // Get the current controller ID
// If controllerId is not null, it indicates Kafka is available
if (controllerId != null) {
System.out.println("Kafka broker is up.");
}
} catch (Exception e) {
// This block is executed if there is an exception, e.g., broker is down
System.out.println("Kafka broker is down. Sending email alert...");
mailService.sendMailAlert();
e.printStackTrace();
}
}
我刚刚将
mailService.sendMailAlert()
移至异常处理程序块中