我正在将 spring-boot 应用程序从 spring-boot 2.7.x 升级到 3.2.x。在 spring-boot 2.7.x 中,我使用 @StreamListener 来监听来自 kafka 的消息,但在将消息的控制权交给用 @StreamListener 注释的方法之前,我曾经使用
来拦截消息
@Around("@annotation(org.springframework.cloud.stream.annotation.StreamListener)")
public void msgProcessor(ProceedingJoinPoint jp){
//process msg
jp.proceed()
//process msg again after the processing at method annotated with @StreamListener
}
但是现在 spring-boot 3.2.x StreamListener(以及其他注释,如 EnableBinding、Output、Input)已被弃用,所以我转移到 @KafkaListener 并更新了上面的代码,如下所示:
@Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
public void msgProcessor(ProceedingJoinPoint jp){
//process msg
jp.proceed()
//process msg again after the processing at method annotated with @KafkaListener
}
但是通过此更改,流本身不会被 @Around 注释拦截。 任何建议、修复或替代方案都会非常有帮助。
我尝试在类中实现 ConsumerInterceptor
由于
@StreamListener
已被弃用,并且在 Spring Kafka 中没有直接等效项,因此我会选择 ConsumerInterceptor
,因为它允许在消费者级别拦截消息。要保持类似的流程,请考虑使用 Spring Kafka 的面向方面编程。拦截方法执行而不是直接针对@KafkaListener
注释,应该是这样的方式
@Aspect
@Component
public class KafkaListenerInterceptor {
@Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
public Object interceptKafkaListener(ProceedingJoinPoint joinPoint) throws Throwable {
//Process message before invoking Kafka listener method
//Proceed with the invocation of the Kafka listener method
Object result = joinPoint.proceed();
//Process message after Kafka listener method invocation
//You can access the result or modify it here if needed
return result;
}
}