大家好(希望是传奇人物加里·拉塞尔)
我正在尝试为 Kafka Listener 编写一些集成测试,目的是确认我的 KafkaListener 能够使用我的生产者发送的消息。
这是我的测试套件,我在断言检查中失败了。
@EmbeddedKafka(topics = {TOPIC})
@RunWith(SpringRunner.class)
@SpringBootTest(
classes = {KafkaIntegrationTestConfiguration.class}
)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class KafkaIntegrationTest {
@Inject
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Inject
private KafkaTemplate<String, Event> producer;
@SpyBean
private CustomKafkaConsumer consumer;
@Test
public void testKafkaListener() throws Exception {
// initialize listener container
ConcurrentMessageListenerContainer<?, ?> container =
(ConcurrentMessageListenerContainer<?, ?>) kafkaListenerEndpointRegistry.getListenerContainer("custom-consumer");
container.stop();
@SuppressWarnings("unchecked")
AcknowledgingConsumerAwareMessageListener<String, Event> messageListener =
(AcknowledgingConsumerAwareMessageListener<String, Event>) container
.getContainerProperties().getMessageListener();
// create CountDownLatch
CountDownLatch latch = new CountDownLatch(1);
// wrap KafkaListener
container.getContainerProperties()
.setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, Event>() {
@Override
public void onMessage(ConsumerRecord<String, Event> data, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
try {
messageListener.onMessage(data, acknowledgment, consumer);
} finally {
latch.countDown();
}
}
});
container.start();
// send event to kafka
sendToKafkaTemplate();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
我的显式生产者配置为:
@EnableKafka
@EmbeddedKafka(topics = {TOPIC})
@Import({ KafkaAutoConfiguration.class })
@ComponentScan("..kafkaConfiguration")
@ComponentScan("..kafka")
public class KafkaIntegrationTestConfiguration {
@Value("${kafka.mainClusterSchemaRegistry}")
private String schemaRegistry;
@Bean
private KafkaTemplate<String, Event> customKafkaTemplate(EmbeddedKafkaBroker ekb) {
Map<String, Object> config = KafkaTestUtils.producerProps(ekb);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry);
DefaultKafkaProducerFactory<String, Event> factory = new DefaultKafkaProducerFactory<>(config);
KafkaTemplate<String, Event> kafkaTemplate = new KafkaTemplate<>(factory);
kafkaTemplate.setDefaultTopic(TOPIC);
return kafkaTemplate;
}
}
我希望有用的一个提示是,我尝试在 onMessage 覆盖函数中放置一个断点,但它永远不会到达那里。
关于要调查的内容有任何线索/建议吗?
PS 这里是我的 KafkaListenerFactory - 省略 KafkaListener 类,因为它配置了主题的相当简单的注释
@Bean(name = LISTENER_CONTAINER_FACTORY_NAME)
public ConcurrentKafkaListenerContainerFactory<String, Event>
kafkaListenerContainerFactory(SslConfig sslConfig, KafkaConfig kafkaConfig) {
final ConcurrentKafkaListenerContainerFactory<String, Event> factory =
new ConcurrentKafkaListenerContainerFactory<>();
final ConsumerFactory<String, Event> consumerFactor =
consumerFactory(sslConfig, kafkaConfig);
factory.setConsumerFactory(consumerFactor);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// need to update to use new Error Handler
factory.setErrorHandler(buildListenerErrorHandler());
return factory;
}
@KafkaListener(
id = "sync-consumer",
topics = TOPIC,
groupId = GROUP_ID,
containerFactory = LISTENER_CONTAINER_FACTORY_NAME
)
您可能会用该
@KafkaListener
方法做一些事情。如果您致电一些服务机构,那就太好了。因此,将 bean 作为 @SpyBean
可能更容易,并且 Spring Boot 测试框架可以使用 @KafkaListener
将其正确注入到您的组件中。然后你spy它的方法并最终验证。您可能仍然需要 CountDownLatch
,因为消耗发生在另一个线程上。或者您可以只等待循环,直到对间谍进行验证为止。