我正在开发 MQTT kafka 源连接器。这些消息是 来自 MQTT 发布者,然后连接器将订阅 MQTT topic(/iot/sensor/#) 并且这些消息将发布到 Kafka 经纪人。但与此同时,如果我再次停止连接器 重新启动连接器,然后连接器正在使用来自的消息 mqtt 代理,但一些 mqtt 消息丢失(如果我发送 1000 来自 MQTT 发布者的消息并且在 kafka 中未收到 1000 条消息) 我给 mqtt ClientID 的是唯一名称。我设置了 mqtt paho 客户端 属性 setCleanSession=false,QOS=1 并使用 MqttDefaultFilePersistence(DIR)。 Rabbitmq 版本是 3.6.10
伪代码
Public class MyTask extends SourceTask implements MqttCallback{
//Initialzied the queue
@Override
public void start(Map<String, String> map) {
//set necessary properties and configuration
//set the required mqttConnect options and mqtt broker
Paho mClient = new MqttClient(mqttURl,Unique ClientID, new MqttDefaultFilePersistence(DIR))
mClient.connect(RequiredConnectProperties)
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
//poll the queue
// publish messages to kafka
}
@Override
public void stop() {
//disconnect the mqtt paho clent
}
@Override
public void messageArrived(String mqtttopic, MqttMessage message) throws Exception {
//add the messages to queue
}
}
请参阅我的答案 随着清除会话标志设置为 FALSE,我缺少已发布的值。我也遇到过同样的问题并解决了。
您必须在调用 connect 之前调用 mqttClient.setCallback() ,这专门解决了这个细微差别