为什么用@StreamListener 注释的 kafka 侦听器表现不同于 kafka 侦听器实现消费者接口?

问题描述 投票:0回答:0

我们正在将 spring boot 2.2.6.release 升级到 2.7.8 并在 Kafka 侦听器上面临以下问题。

场景-1:

之前我们使用 SCS 3.0.4.Release 并使用@StreamListener 来消费 Kafka 消息。我们有以下一种情况,其中一个 Kafka 通道 TEST_CHANNEL 被 2 个不同的 @StreamListener 使用。当任何消息发布到 TEST_TOPIC 时,监听器都会收到每条消息的副本并对其进行处理。但我的理解是由于单个消费者组TEST_TOPIC_GROUP消息应该被每个听众以循环方式消费,一次只有一个听众消费消息并处理它。下面是我在我的项目中所做的示例配置。

@Component
public class TestListener1 {   

    @StreamListener("TEST_CHANNEL")
    public void handle(final Message<String> message) {
        log.info("execute Test Listener 1" );        
    }
}

@Component
public class TestListener2 {   

    @StreamListener("TEST_CHANNEL")
    public void handle(final Message<String> message) {
        log.info("execute Test Listener 2" );        
    }
}

配置:

spring:
  cloud:
    stream:
      bindings:
        TEST_CHANNEL:
          binder: kafka
          content-type: application/json
          destination: TEST_TOPIC
          group: TEST_TOPIC_GROUP
    

场景-2:

升级到 SCS 3.2.6 后,@StreamListener 已弃用,现在我们将使用 Consumer 接口,我们的 Kafka 监听器将实现 Consumer 接口。这里我们有 2 个不同的频道(TEST_CHANNEL_1,TEST_CHANNEL_2),具有相同的目的地(TEST_TOPIC)和消费者组(TEST_TOPIC_GROUP)。在这种情况下,两个侦听器都将收到消息,但消息是以循环方式在两个侦听器之间分发的。如场景 1 中所述,每个消息副本都不会被两个侦听器使用和处理。

@Component("TEST_CHANNEL_1")
public class TestListener1 implements Consumer<Message<String>> {
    
    @Override
    public void accept(final Message<String> message) {
        log.info("execute Test Listener 1" );        
    }
}

@Component("TEST_CHANNEL_2")
public class TestListener2 implements Consumer<Message<String>> {

    @Override    
    public void accept(final Message<String> message) {
        log.info("execute Test Listener 2" );        
    }
}

配置:

spring:
  cloud:
    stream:
      bindings:
        TEST_CHANNEL_1:
          binder: kafka
          content-type: application/json
          destination: TEST_TOPIC
          group: TEST_TOPIC_GROUP
        TEST_CHANNEL_2:
          binder: kafka
          content-type: application/json
          destination: TEST_TOPIC
          group: TEST_TOPIC_GROUP

是否有人更早遇到过这个问题,请提出建议?

apache-kafka spring-cloud-stream consumer spring-cloud-stream-binder-kafka spring-cloud-stream-binder
© www.soinside.com 2019 - 2024. All rights reserved.