Spring Kafka 提交监听器之后

问题描述 投票:0回答:1

我正在尝试编写抽象代码,该代码应该跟踪偏移量变化并在消息处理后执行一些操作。问题是我不想过多更改现有侦听器的代码。

找到了一些关于拦截器和AcknowledgingMessageListener的信息 - 消息监听器容器

拦截器并不是我真正的情况,因为需要在之后而不是之前执行操作。 AcknowledgingMessageListener - 不想/不需要手动提交偏移量。

活动列表也没有适合我的。

是否有任何优雅的方式至少在提交或消息处理后触发某些操作?

提前致谢!

java spring spring-boot spring-kafka
1个回答
0
投票

正如评论中提到的,使用

RecordInterceptor
是正确的策略。它提供了捕获成功侦听器退出和失败的选项,以及在侦听器成功退出和失败退出后调用的方法。这是拦截成功侦听器退出的示例。

@SpringBootApplication
public class So77918542Application {

    public static void main(String[] args) {
        SpringApplication.run(So77918542Application.class, args);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> kafkaConsumerFactory,
            RecordInterceptor<String, String> recordInterceptor) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory);
        factory.setRecordInterceptor(recordInterceptor);
        return factory;
    }

    @KafkaListener(id = "so59256214.id", topics = "so77918542.topic")
    void listen(ConsumerRecord<String, String> in) {
        System.out.println("From KafkaListener: " + in);
    }

    @Bean
    public RecordInterceptor<String, String> recordInterceptor() {
        return new CompositeRecordInterceptor<>() {

            @Override
            public void success(ConsumerRecord<String, String> record, Consumer<String, String> consumer) {
                System.out.println("Listener exited: " + record);
            }

        };
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
        return args -> {
            template.send("so77918542.topic", "my-data");
        };
    }

}

运行此应用程序时,我在输出中看到以下内容:

From KafkaListener: ConsumerRecord(topic = so77918542.topic, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1706822418988, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = my-data)
Listener exited: ConsumerRecord(topic = so77918542.topic, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1706822418988, serialized key size = -1, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = my-data)

© www.soinside.com 2019 - 2024. All rights reserved.