如何使用AOP(@Around注解)拦截kafka消息,并使用@KafkaListener注解进行监听

问题描述 投票:0回答:1

我正在将 spring-boot 应用程序从 spring-boot 2.7.x 升级到 3.2.x。在 spring-boot 2.7.x 中,我使用 @StreamListener 来监听来自 kafka 的消息,但在将消息的控制权交给用 @StreamListener 注释的方法之前,我曾经使用

来拦截消息

@Around("@annotation(org.springframework.cloud.stream.annotation.StreamListener)")
public void msgProcessor(ProceedingJoinPoint jp){
//process msg
jp.proceed()
//process msg again after the processing at method annotated with @StreamListener 
}

但是现在 spring-boot 3.2.x StreamListener(以及其他注释,如 EnableBinding、Output、Input)已被弃用,所以我转移到 @KafkaListener 并更新了上面的代码,如下所示:

@Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
public void msgProcessor(ProceedingJoinPoint jp){
//process msg
jp.proceed()
//process msg again after the processing at method annotated with @KafkaListener
}

但是通过此更改,流本身不会被 @Around 注释拦截。 任何建议、修复或替代方案都会非常有帮助。

我尝试在类中实现 ConsumerInterceptor> ,当我配置 spring.kafka.consumer.properties.interceptor.class= 时,我可以拦截消息,但我没有 jp 来执行 jp.proceed()这样就可以实现消息的来回处理。

spring-boot apache-kafka spring-kafka kafka-consumer-api spring-aop
1个回答
0
投票

由于

@StreamListener
已被弃用,并且在 Spring Kafka 中没有直接等效项,因此我会选择
ConsumerInterceptor
,因为它允许在消费者级别拦截消息。要保持类似的流程,请考虑使用 Spring Kafka 的面向方面编程。拦截方法执行而不是直接针对
@KafkaListener
注释,应该是这样的方式

@Aspect
@Component
public class KafkaListenerInterceptor {
    
    @Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
    public Object interceptKafkaListener(ProceedingJoinPoint joinPoint) throws Throwable {

        //Process message before invoking Kafka listener method
        
        //Proceed with the invocation of the Kafka listener method
        Object result = joinPoint.proceed();
        
        //Process message after Kafka listener method invocation
        //You can access the result or modify it here if needed
        
        return result;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.