[目前,我有一个基本的Kafka流应用程序,该应用程序涉及仅具有源和处理器但没有接收器的拓扑。本质上,拓扑仅处理消息的使用。对于产生消息,我们在传递给拓扑的ProcessorSupplier实例中,特别是在重写的process
方法中,对Producer API进行调用。尽管我知道Producer API在这里是多余的,因为我可以简单地将接收器添加到拓扑中,但是我必须以这种方式设置流应用程序。至于测试,我尝试了TopologyTestDriver
包中提供的kafka-streams-test-utils类。但是,我不仅要测试拓扑,还要测试对Producer API的调用。使用TopologyTestDriver
要求我模拟我的Producer
实例,因为该实例与Streams API分开。结果,由于消息未被“转发”,因此我无法从TopologyTestDriver
中读取消息进行单元测试。
这是我的process
方法的简化版:
@Override
public void process(String key, String value) {
// some data processing stuff that I leave out for simplicity sake
String topic = "...";
Properties props = ...;
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord(topic, key, value);
producer.send(record);
}
这是我的样本单元测试的简化:
@Test
public void process() {
Topology topology = new Topology();
topology.addSource("source", "input-topic");
topology.addProcessor("processor", ..., "source");
Properties props = ...;
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
ConsumerRecordFactory<String, String> factory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
// the following line will work fine as long as the producer is mocked
testDriver.pipeInput(factory.create("input-topic", "key", "value"));
// since the producer is mocked, no message can be read from the output topic
ProducerRecord<String, String> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());
assertNull(outputRecord); // returns true
}
总结一下我的问题,有没有一种方法可以编写一个单元测试来测试拓扑中的消息使用和产生,该拓扑使用Producer API将消息写到传出主题中?
您不应使用自定义Producer
,而应在Topology
中添加接收器。调用Producer.send()
是异步的,因此您可能会丢失数据。为避免数据丢失,您将需要使呼叫同步,即获取Future
返回的send()
并等待其完成,然后再返回process()
。但是,这对您的吞吐量有很大影响,因此不建议这样做。
如果添加接收器,则可以避免这些问题,因为Kafka Streams现在将了解向输出主题发送了哪些数据,因此不会发生数据丢失,而Kafka Streams可以使用性能更高的异步调用。
除了正确性问题,似乎您为当前代码中处理的每条消息创建一个新的KafkaProducer
,这效率很低。此外,使用接收器将简化您的代码。当然,您可以使用TopologyTestDriver
获得适当的测试功能。