单元测试 KafkaStreams 给出 IllegalArgumentException:未知主题

问题描述 投票:0回答:1

我有一个应用程序,它使用 KStream 从 Kafka 读取数据,根据标头过滤数据,然后写入 KTable。

public Topology buildTopology() {
        KStream<String,String> inputStream = builder.stream("topicname");
        KStream<String,String> filteredStream = inputStream.transformValues(KSExtension::new)
                .filter((key,value) -> value!=null);
        
        kTable = filteredStream.groupByKey()
                .reduce(((value1, value2) -> value2));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        return builder.build();
    }

我正在尝试使用 TopologyTestDriver 为此创建一个单元测试

private TopologyTestDriver td;
    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, String> outputTopic;
    private Topology topology;
    private Properties streamConfig;

@BeforeEach
    void setUp() {
        streamConfig = new Properties();
        streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
        streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
        streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        topology = new Topology();
        td = new TopologyTestDriver(topology, streamConfig);
        inputTopic = td.createInputTopic("input-topic", Serdes.String().serializer(), Serdes.String().serializer());
        outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
    }
 @Test
    void buildTopology(){
        inputTopic.pipeInput("key1", "value1");
        topology = app.buildTopology();
    }

当我运行测试时,我收到异常“java.lang.IllegalArgumentException:未知主题:输入主题”

DEBUG org.apache.kafka.streams.processor.internals.InternalTopologyBuilder - No source topics using pattern subscription found, initializing consumer's subscription collection.

java.lang.IllegalArgumentException: Unknown topic: input-topic
    at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:582)
    at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:945)
    at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
    at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:137)
    at testclassname.buildTopology()

有人可以帮助我理解我在这里缺少什么吗?

java apache-kafka junit apache-kafka-streams ktable
1个回答
0
投票

我看到你正在创建一个 empty

Topology
用于初始化
TopologyTestDriver
:

topology = new Topology();
td = new TopologyTestDriver(topology, streamConfig);

当这个空拓扑用于实例化

TopologyTestDriver
td = new TopologyTestDriver(topology, streamConfig);
时,测试驱动程序不知道任何主题,因为没有有效构建拓扑。

我想这就是为什么,当您尝试使用

"input-topic"
将输入输入到
inputTopic.pipeInput("key1", "value1");
时,测试驱动程序会抛出一个
IllegalArgumentException
抱怨“
Unknown topic: input-topic
”。


您应该调用

buildTopology()
方法来生成您正在测试的实际拓扑,并在创建
TopologyTestDriver
时使用它。

确保测试中的主题名称 (

input-topic
output-topic
) 与实际应用程序中的主题名称 (
"topicname"
) 相匹配。

@BeforeEach
void setUp() {
    streamConfig = new Properties();
    streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
    streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
    streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    // Create the topology using your actual code
    topology = app.buildTopology();

    // Now create a TopologyTestDriver using the real topology
    td = new TopologyTestDriver(topology, streamConfig);

    // The topic name here should match the actual topic you use in the real topology
    inputTopic = td.createInputTopic("topicname", Serdes.String().serializer(), Serdes.String().serializer());

    // Create output topic if you need it
    // outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
}

@Test
void buildTopology(){
    inputTopic.pipeInput("key1", "value1");
    // Your assertions here
}

注意:我从设置中删除了输出主题,因为在您的代码片段中,您没有指定写入

KTable
的输出主题。如果您的实际应用程序写入输出主题,您可以将其添加回来。

© www.soinside.com 2019 - 2024. All rights reserved.