我有一个基于非常出色,可靠和方便的TopologyTestDriver的Kafka Streams单元测试:
try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(),
streamsConfig(Serdes.String().getClass(), SpecificAvroSerde.class))) {
TestInputTopic<String, Event> inputTopic = testDriver.createInputTopic(inputTopicName,
Serdes.String().serializer(), eventSerde.serializer());
TestOutputTopic<String, Frame> outputWindowTopic = testDriver.createOutputTopic(
outputTopicName, Serdes.String().deserializer(), frameSerde.deserializer());
...
}
我想测试更复杂的设置,其中“输出”主题是另一种拓扑的“输入”主题。
我可以在同一拓扑中定义几个输入和输出主题。但是,只要在相同的拓扑中使用相同的主题作为输入和输出主题,就会收到以下异常:
org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic events has already been registered by another source.
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
at org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.writeToTopology(StreamSourceNode.java:94)
at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:547)
看起来好像TopologyTestDriver没有提供定义输入输出主题的可能性,对吗?
KStream
对象,基本上是一个“扇出”(或“广播”),它将向两个操作员发送输入数据。