Kafka Streams TopologyTestDriver输入输出主题

问题描述 投票:0回答:1

我有一个基于非常出色,可靠和方便的TopologyTestDriver的Kafka Streams单元测试:

    try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(),
            streamsConfig(Serdes.String().getClass(), SpecificAvroSerde.class))) {

        TestInputTopic<String, Event> inputTopic = testDriver.createInputTopic(inputTopicName,
                Serdes.String().serializer(), eventSerde.serializer());

        TestOutputTopic<String, Frame> outputWindowTopic = testDriver.createOutputTopic(
                outputTopicName, Serdes.String().deserializer(), frameSerde.deserializer());

        ...

     }

我想测试更复杂的设置,其中“输出”主题是另一种拓扑的“输入”主题。

我可以在同一拓扑中定义几个输入和输出主题。但是,只要在相同的拓扑中使用相同的主题作为输入和输出主题,就会收到以下异常:

org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic events has already been registered by another source.
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
    at org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.writeToTopology(StreamSourceNode.java:94)
    at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
    at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
    at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:547)

看起来好像TopologyTestDriver没有提供定义输入输出主题的可能性,对吗?

apache-kafka-streams
1个回答
0
投票
两次使用相同的KStream对象,基本上是一个“扇出”(或“广播”),它将向两个操作员发送输入数据。
© www.soinside.com 2019 - 2024. All rights reserved.