我在尝试测试 kafka 监听器时遇到了很大的麻烦。
课程如下:
@Component
class KafkaListener(
private val useCase: UseCase
) {
@KafkaListener(
topics = ["topic"],
groupId = "demo",
// this is giving me problems, everything works well when I remove it
containerFactory = "containerFactory",
clientIdPrefix = "prefix"
)
fun listener(message: String) {
println("message received!")
useCase.foo()
}
}
测试:
@TestConfiguration
class TestConfig {
@Bean
fun containerFactory(): ConcurrentKafkaListenerContainerFactory1<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory1<String, String>()
val props = HashMap<String, Any?>()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "127.0.0.1:9092"
val defaultConsumer = DefaultKafkaConsumerFactory(props, StringDeserializer(), StringDeserializer())
factory.consumerFactory = defaultConsumer
return factory
}
}
@Testcontainers
@SpringBootTest(properties = ["spring.kafka.consumer.auto-offset-reset=earliest"])
@EnableAutoConfiguration(exclude = [MongoAutoConfiguration::class])
@Import(KafkaListener::class, TestConfig::class)
class KafkaShippingGroupCreationListenerAdapterTest {
companion object {
@Container
private val kafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.3"))
@JvmStatic
@DynamicPropertySource
fun overrideProperties(registry: DynamicPropertyRegistry) {
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
}
}
@MockBean
private lateinit var useCase: UseCase
@Autowired
private lateinit var kafkaTemplate: KafkaTemplate<String, Any>
@Autowired
private lateinit var kafkaListener: KafkaListener
@Test
fun `should invoke the use case`() {
kafkaTemplate.send("topic", "message")
verify(useCase,
timeout(5000).times(1)).foo()
}
}
当我没有在侦听器函数中指定工厂时,一切正常,但是当我指定工厂时,侦听器不会使用容器提供的 kafka boostrap 服务器进行更新。
你为什么期望它能起作用? Boot 的自动配置工厂使用测试中设置的
spring.kafka.bootstrap-servers
属性,但您的被硬编码为 "127.0.0.1:9092"
。