集成测试 KafkaListener - 确认侦听器已消费消息

问题描述 投票:0回答:1

大家好(希望是传奇人物加里·拉塞尔)

我正在尝试为 Kafka Listener 编写一些集成测试,目的是确认我的 KafkaListener 能够使用我的生产者发送的消息。

这是我的测试套件,我在断言检查中失败了。

@EmbeddedKafka(topics = {TOPIC})
@RunWith(SpringRunner.class)
@SpringBootTest(
    classes = {KafkaIntegrationTestConfiguration.class}
)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class KafkaIntegrationTest {
  @Inject
  private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

  @Inject
  private KafkaTemplate<String, Event> producer;

  @SpyBean
  private CustomKafkaConsumer consumer;

  @Test
  public void testKafkaListener() throws Exception {

    // initialize listener container
    ConcurrentMessageListenerContainer<?, ?> container =
      (ConcurrentMessageListenerContainer<?, ?>) kafkaListenerEndpointRegistry.getListenerContainer("custom-consumer");
    container.stop();
    @SuppressWarnings("unchecked")
    AcknowledgingConsumerAwareMessageListener<String, Event> messageListener =
        (AcknowledgingConsumerAwareMessageListener<String, Event>) container
        .getContainerProperties().getMessageListener();
    // create CountDownLatch
    CountDownLatch latch = new CountDownLatch(1);

    // wrap KafkaListener
    container.getContainerProperties()
      .setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, Event>() {

        @Override
        public void onMessage(ConsumerRecord<String, Event> data, Acknowledgment acknowledgment,
                              Consumer<?, ?> consumer) {
          try {
            messageListener.onMessage(data, acknowledgment, consumer);
          } finally {
            latch.countDown();
          }
        }

      });

    container.start();

    // send event to kafka
    sendToKafkaTemplate();

    assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
  }

我的显式生产者配置为:

@EnableKafka
@EmbeddedKafka(topics = {TOPIC})
@Import({ KafkaAutoConfiguration.class })
@ComponentScan("..kafkaConfiguration")
@ComponentScan("..kafka")
public class KafkaIntegrationTestConfiguration {
  @Value("${kafka.mainClusterSchemaRegistry}")
  private String schemaRegistry;

  @Bean
  private KafkaTemplate<String, Event> customKafkaTemplate(EmbeddedKafkaBroker ekb) {

    Map<String, Object> config = KafkaTestUtils.producerProps(ekb);

    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry);

    DefaultKafkaProducerFactory<String, Event> factory = new DefaultKafkaProducerFactory<>(config);

    KafkaTemplate<String, Event> kafkaTemplate = new KafkaTemplate<>(factory);
    kafkaTemplate.setDefaultTopic(TOPIC);
    return kafkaTemplate;
  }
}

我希望有用的一个提示是,我尝试在 onMessage 覆盖函数中放置一个断点,但它永远不会到达那里。

关于要调查的内容有任何线索/建议吗?

PS 这里是我的 KafkaListenerFactory - 省略 KafkaListener 类,因为它配置了主题的相当简单的注释

  @Bean(name = LISTENER_CONTAINER_FACTORY_NAME)
  public ConcurrentKafkaListenerContainerFactory<String, Event>
      kafkaListenerContainerFactory(SslConfig sslConfig, KafkaConfig kafkaConfig) {
    final ConcurrentKafkaListenerContainerFactory<String, Event> factory =
        new ConcurrentKafkaListenerContainerFactory<>();

    final ConsumerFactory<String, Event> consumerFactor =
        consumerFactory(sslConfig, kafkaConfig);
    factory.setConsumerFactory(consumerFactor);

    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    // need to update to use new Error Handler
    factory.setErrorHandler(buildListenerErrorHandler());

    return factory;
  }


  @KafkaListener(
      id = "sync-consumer",
      topics = TOPIC,
      groupId = GROUP_ID,
      containerFactory = LISTENER_CONTAINER_FACTORY_NAME
  )
apache-kafka spring-kafka
1个回答
0
投票

您可能会用该

@KafkaListener
方法做一些事情。如果您致电一些服务机构,那就太好了。因此,将 bean 作为
@SpyBean
可能更容易,并且 Spring Boot 测试框架可以使用
@KafkaListener
将其正确注入到您的组件中。然后你spy它的方法并最终验证。您可能仍然需要
CountDownLatch
,因为消耗发生在另一个线程上。或者您可以只等待循环,直到对间谍进行验证为止。

© www.soinside.com 2019 - 2024. All rights reserved.