客户端断开连接时 Paho MQTT 消息可靠性

问题描述 投票:0回答:1

我正在开发 MQTT kafka 源连接器。这些消息是 来自 MQTT 发布者,然后连接器将订阅 MQTT topic(/iot/sensor/#) 并且这些消息将发布到 Kafka 经纪人。但与此同时,如果我再次停止连接器 重新启动连接器,然后连接器正在使用来自的消息 mqtt 代理,但一些 mqtt 消息丢失(如果我发送 1000 来自 MQTT 发布者的消息并且在 kafka 中未收到 1000 条消息) 我给 mqtt ClientID 的是唯一名称。我设置了 mqtt paho 客户端 属性 setCleanSession=false,QOS=1 并使用 MqttDefaultFilePersistence(DIR)。 Rabbitmq 版本是 3.6.10

伪代码

Public class MyTask extends SourceTask implements MqttCallback{
//Initialzied the queue
@Override
    public void start(Map<String, String> map) {
    //set necessary properties and configuration
    //set the required mqttConnect options and mqtt broker
     Paho mClient = new MqttClient(mqttURl,Unique ClientID, new MqttDefaultFilePersistence(DIR))
     mClient.connect(RequiredConnectProperties)
    }
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
    //poll the queue
    // publish messages to kafka
    }
    @Override
    public void stop() {
    //disconnect the mqtt paho clent
    }
    @Override
    public void messageArrived(String mqtttopic, MqttMessage message) throws Exception {
    //add the messages to queue

    }
}
java apache-kafka mqtt paho
1个回答
0
投票

请参阅我的答案 随着清除会话标志设置为 FALSE,我缺少已发布的值。我也遇到过同样的问题并解决了。

您必须在调用 connect 之前调用 mqttClient.setCallback() ,这专门解决了这个细微差别

© www.soinside.com 2019 - 2024. All rights reserved.