使用TopologyTestDriver和Micronaut对Kafka流进行单元测试。

问题描述 投票:0回答:1

当我尝试运行一个基本的单元测试时,我得到以下错误。

java.lang.IllegalArgumentException: Unknown topic: streams-wordcount-output

    at org.apache.kafka.streams.TopologyTestDriver.getRecordsQueue(TopologyTestDriver.java:705)
    at org.apache.kafka.streams.TopologyTestDriver.readRecord(TopologyTestDriver.java:777)
    at org.apache.kafka.streams.TestOutputTopic.readRecord(TestOutputTopic.java:100)
    at at.wrwks.bmp.projekthistorie.streams.WordCountIntegrationTest.test(WordCountIntegrationTest.kt:70)

这是流式代码(基于Micronaut文档。

@Factory
class WordCountStream {

    @Singleton
    @Named(STREAM_WORD_COUNT)
    fun wordCountStream(builder: ConfiguredStreamBuilder): KStream<String, String> {
        // set default serdes
        val props: Properties = builder.configuration
        props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String()::class.jvmName
        props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String()::class.jvmName
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
        val source: KStream<String, String> = builder.stream(INPUT)
        val groupedByWord: KTable<String, Long> = source
            .flatMapValues { value -> value.toLowerCase().split("\\W+") }
            .groupBy({ _, word -> word }, Grouped.with(Serdes.String(), Serdes.String()))
            //Store the result in a store for lookup later
            .count(Materialized.`as`(WORD_COUNT_STORE))
        groupedByWord
            //convert to stream
            .toStream()
            //send to output using specific serdes
            .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long()))
        return source
    }

    companion object {
        const val STREAM_WORD_COUNT = "word-count"
        const val INPUT = "streams-plaintext-input"
        const val OUTPUT = "streams-wordcount-output"
        const val WORD_COUNT_STORE = "word-count-store"
    }
}

这是测试代码

@Tag("integration")
@MicronautTest(application = ProjekthistorieApplication::class)
class WordCountIntegrationTest {
    companion object {
        val builder = mockk<ConfiguredStreamBuilder>()
    }
    @Inject
    private lateinit var context: ApplicationContext
    @Inject
    private lateinit var wordCountStream: KStream<String, String>
    @Inject
    private lateinit var wordCountStreamFactory: WordCountStream
    lateinit var inputWordCount: TestInputTopic<String, String>
    lateinit var outputWordCount: TestOutputTopic<String, Long>
    lateinit var wordCount: TestOutputTopic<String, Long>
    lateinit var testDriver: TopologyTestDriver
    private lateinit var before: LocalDateTime

    @MockBean
    fun configuredStreamBuilder(): ConfiguredStreamBuilder {
        return builder
    }

    @BeforeEach
    fun setUp() {

    }

    @AfterEach
    fun tearDown() {
        testDriver.close()
    }

    @Test
    fun test() {
        before = LocalDateTime.now()
        val streamsBuilder = StreamsBuilder()
        every { builder.stream<String, String>(WordCountStream.INPUT) } returns streamsBuilder.stream(WordCountStream.INPUT)
        val config = Properties()
        config[StreamsConfig.APPLICATION_ID_CONFIG] = "bmp.projekthistorie.test"
        config[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234"
        testDriver = TopologyTestDriver(streamsBuilder.build(), config)
        inputWordCount = testDriver.createInputTopic(WordCountStream.INPUT, Serdes.StringSerde().serializer(), Serdes.StringSerde().serializer())
        outputWordCount = testDriver.createOutputTopic(WordCountStream.OUTPUT, Serdes.StringSerde().deserializer(), Serdes.LongSerde().deserializer())
        wordCount = testDriver.createOutputTopic(WordCountStream.STREAM_WORD_COUNT, Serdes.StringSerde().deserializer(), Serdes.LongSerde().deserializer())

        inputWordCount.pipeInput("word word la la")
        val readRecord = outputWordCount.readRecord()

        Assertions.assertThat(readRecord).isNotNull()
    }
}

我错过了什么?

unit-testing kotlin apache-kafka-streams junit5 micronaut
1个回答
0
投票

我认为问题出在

@MockBean
fun configuredStreamBuilder(): ConfiguredStreamBuilder {
   return builder
}

你创建一个测试驱动的拓扑,

val streamsBuilder = StreamsBuilder()
testDriver = TopologyTestDriver(streamsBuilder.build(), config)

但你从来没有在你的流媒体服务中注入它。你注入了一个模拟的ConfiguredStreamBuilder。

尝试用MockBean注入。

val streamsBuilder = StreamsBuilder()

0
投票

因为... ConfiguredStreamBuilder 的一个子类。StreamsBuilder 没有必要嘲笑和 ConfiguredStreamBuilder 可以配置成一个Bean,它将被用于流工厂Bean中。这里是工作测试。

@Tag("integration")
@MicronautTest(environments = ["dev"])
class WordCountIntegrationTest(private val configuredStreamBuilder: ConfiguredStreamBuilder) {
    lateinit var inputWordCount: TestInputTopic<String, String>
    lateinit var outputWordCount: TestOutputTopic<String, Long>
    lateinit var testDriver: TopologyTestDriver

    @Bean
    fun configuredStreamBuilder(): ConfiguredStreamBuilder {
        val config = Properties()
        config[StreamsConfig.APPLICATION_ID_CONFIG] = "app"
        config[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234"
        config[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String()::class.jvmName
        config[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String()::class.jvmName
        return ConfiguredStreamBuilder(config)
    }

    @Test
    fun test() {
        testDriver = TopologyTestDriver(configuredStreamBuilder.build(), configuredStreamBuilder.configuration)
        inputWordCount = testDriver.createInputTopic(WordCountStream.INPUT, Serdes.StringSerde().serializer(), Serdes.StringSerde().serializer())
        outputWordCount = testDriver.createOutputTopic(WordCountStream.OUTPUT, Serdes.StringSerde().deserializer(), Serdes.LongSerde().deserializer())

        inputWordCount.pipeInput("word word la la")
        val readRecord = outputWordCount.readRecord()

        assertThat(readRecord).isNotNull()
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.