我正在使用带有kafka集成的spring boot应用程序,我想实现一个端点来停止和启动kafka发布消息。该消息由另一个端点以异步方式触发。
豆子KafkaTemplate<String, String>
或ProducerFactory<String, String> producerFactory()
没有任何停止和暂停动作。
我的目标是能够模拟连接失败并确保这些消息存储在我所拥有的后备机制中。
有任何想法吗?
KafkaTemplate
没有那些回调,因为它是一个被动组件,只有在我们调用它时它才能完成。
为了模拟连接失败,我建议你实现一个自定义的ProducerFactory
生成一个KafkaProducer
与模拟或重写Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
。在那个ProducerFactory
,你可以实现适当的生命周期回调,并在提到的send()
实现中对状态作出反应。
org.apache.kafka.clients.producer.MockProducer
可能有一些东西可供您重复使用或借用。例如,看看它的close()
或fenceProducer()
。