我有一个使用mysql binlog生成的数据流的应用程序。这是我的代码的相关部分。
KStream<GenericRecord,mysql.x.Envelope> xStream = builder.stream(sourceTopicName,
Consumed.with(XSerde.getGenericKeySerde(), XSerde.getEnvelopeSerde()));
出于某些健康检查的目的,我需要能够获取KafkaStream的状态。它正在运行,正在等待关机...我找不到KafkaStream和KStream之间的任何关系。
您在构建StreamsBuilder之后得到它。运行状况适用于整个应用程序,而不是单个流
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);