在我的Java应用程序中,我有一个Kafka处理器。
我的处理方法如下所示:
@Override
public void process(String key, String value) {
System.out.println("In the process method, the offset is: " + context.offset());
//Some more code
}
其中context是init方法的ProcessorContext。
我启动应用程序并记录下来:
In the process method, the offset is: 1203
In the process method, the offset is: 1204
然后我再次启动应用程序,我收到相同的消息。在几次应用程序重新启动之后(或者在一段时间后,我找不到模式),进程方法停止被调用,我不再在应用程序启动时收到这些消息。
知道为什么有时会多次处理这些消息吗?
My Streams Config具有以下属性:
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someId");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10);
props.put(StreamsConfig.STATE_DIR_CONFIG, "somedir");
编辑
下面的代码片段显示了我如何创建KafkaStream:
public class KafkaStreamsProcessorBean implements SmartLifecycle {
@Override
public synchronized void start() {
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, String> debeziumStream = builder.stream("debezium.topic");
debeziumStream.process(() -> debeziumProcessor);
kafkaStreams = new KafkaStreams(builder, streamsConfig);
kafkaStreams.start();
}
}
这里的streamsConfig是我已经显示的属性的配置,debeziumProcessor是第一个代码片段中的Kafka Processor。
默认情况下,Kafka Streams处理保证至少一次。这意味着可以重新处理消息。
在您的情况下,即使您将StreamsConfig.PROCESSING_GUARANTEE_CONFIG
设置为StreamsConfig.EXACTLY_ONCE
,您也可以在重启后看到相同的日志(具有相同的偏移信息)。
处理保证是关于在一个事务中将偏移和结果写入主题。这并不意味着消息不能多次处理(使用相同的键和值多次调用Processor :: process(...))。
以下场景是可能的:
Processor::process(...)
被召唤。Processor::process(...)
将调用相同的密钥和值