在 Spring Boot 中使用自定义容器工厂测试 Kafka 监听器

问题描述 投票:0回答:1

我在尝试测试 kafka 监听器时遇到了很大的麻烦。

课程如下:

@Component
class KafkaListener(
       private val useCase: UseCase
) {
   @KafkaListener(
           topics = ["topic"],
           groupId = "demo",
           // this is giving me problems, everything works well when I remove it
           containerFactory = "containerFactory",
           clientIdPrefix = "prefix"
   )
   fun listener(message: String) {
       println("message received!")
       useCase.foo()
   }
}

测试:

@TestConfiguration
class TestConfig {
    @Bean
    fun containerFactory(): ConcurrentKafkaListenerContainerFactory1<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory1<String, String>()
        val props = HashMap<String, Any?>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "127.0.0.1:9092"
        val defaultConsumer = DefaultKafkaConsumerFactory(props, StringDeserializer(), StringDeserializer())
        factory.consumerFactory = defaultConsumer

        return factory
    }
}

@Testcontainers
@SpringBootTest(properties = ["spring.kafka.consumer.auto-offset-reset=earliest"])
@EnableAutoConfiguration(exclude = [MongoAutoConfiguration::class])
@Import(KafkaListener::class, TestConfig::class)
class KafkaShippingGroupCreationListenerAdapterTest {

    companion object {
        @Container
        private val kafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.3"))

        @JvmStatic
        @DynamicPropertySource
        fun overrideProperties(registry: DynamicPropertyRegistry) {
            registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
        }
    }

    @MockBean
    private lateinit var useCase: UseCase

    @Autowired
    private lateinit var kafkaTemplate: KafkaTemplate<String, Any>

    @Autowired
    private lateinit var kafkaListener: KafkaListener

    @Test
    fun `should invoke the use case`() {
        kafkaTemplate.send("topic", "message")

        verify(useCase,
                timeout(5000).times(1)).foo()
    }
}

当我没有在侦听器函数中指定工厂时,一切正常,但是当我指定工厂时,侦听器不会使用容器提供的 kafka boostrap 服务器进行更新。

spring-boot spring-kafka testcontainers spring-kafka-test spring-boot-testcontainers
1个回答
0
投票

你为什么期望它能起作用? Boot 的自动配置工厂使用测试中设置的

spring.kafka.bootstrap-servers
属性,但您的被硬编码为
"127.0.0.1:9092"

© www.soinside.com 2019 - 2024. All rights reserved.