我正在尝试编写抽象代码,该代码应该跟踪偏移量变化并在消息处理后执行一些操作。问题是我不想过多更改现有侦听器的代码。
找到了一些关于拦截器和AcknowledgingMessageListener的信息 - 消息监听器容器。
拦截器并不是我真正的情况,因为需要在之后而不是之前执行操作。 AcknowledgingMessageListener - 不想/不需要手动提交偏移量。
活动列表也没有适合我的。
是否有任何优雅的方式至少在提交或消息处理后触发某些操作?
提前致谢!
正如评论中提到的,使用
RecordInterceptor
是正确的策略。它提供了捕获成功侦听器退出和失败的选项,以及在侦听器成功退出和失败退出后调用的方法。这是拦截成功侦听器退出的示例。
@SpringBootApplication
public class So77918542Application {
public static void main(String[] args) {
SpringApplication.run(So77918542Application.class, args);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> kafkaConsumerFactory,
RecordInterceptor<String, String> recordInterceptor) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory);
factory.setRecordInterceptor(recordInterceptor);
return factory;
}
@KafkaListener(id = "so59256214.id", topics = "so77918542.topic")
void listen(ConsumerRecord<String, String> in) {
System.out.println("From KafkaListener: " + in);
}
@Bean
public RecordInterceptor<String, String> recordInterceptor() {
return new CompositeRecordInterceptor<>() {
@Override
public void success(ConsumerRecord<String, String> record, Consumer<String, String> consumer) {
System.out.println("Listener exited: " + record);
}
};
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
return args -> {
template.send("so77918542.topic", "my-data");
};
}
}
运行此应用程序时,我在输出中看到以下内容:
From KafkaListener: ConsumerRecord(topic = so77918542.topic, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1706822418988, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = my-data)
Listener exited: ConsumerRecord(topic = so77918542.topic, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1706822418988, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = my-data)