我们在您的项目中使用mosquito将您的IoT设备连接到云端。
但是,当只有30个设备(服务器具有3.1 CPU和8G RAM)时,我们总是会出现“发布过多”错误。
我们尝试将qos设置为0,1,2。然而,没有一个工作。
任何人都可以提出一些建议如何修复它?
publishmesssage
public static void publishMessage(MqttPubMsg config) {
String clientId = MqttClient.generateClientId();
MemoryPersistence persistence = new MemoryPersistence();
MqttClient sampleClient = new MqttClient(config.getBroker(), clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(config.getUsername());
connOpts.setPassword(config.getPassword().toCharArray());
connOpts.setConnectionTimeout(mqttConnectTimeout);
connOpts.setKeepAliveInterval(mqttKeppAliveInterval);
sampleClient.connect(connOpts);
MqttMessage message = new MqttMessage(config.getContent());
message.setQos(0);
sampleClient.publish(config.getTopic(), message);
System.out.println("Message published");
}
mosquitto.conf
max_inflight_messages 0
max_queued_messages 0
max_connections -1
subscribemessage
public static void subscribeMessage(MqttSubMsg config) {
System.out.println(config.getBroker());
String clientId = MqttClient.generateClientId();
MemoryPersistence persistence = new MemoryPersistence();
MqttClient sampleClient = new MqttClient(config.getBroker(), clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(config.getUsername());
connOpts.setPassword(config.getPassword().toCharArray());
connOpts.setConnectionTimeout(mqttConnectTimeout);
connOpts.setKeepAliveInterval(mqttKeppAliveInterval);
System.out.println("run receive...");
sampleClient.setCallback(new controllers.PushCallback());
MqttTopic mtopic = sampleClient.getTopic(config.getTopic());
connOpts.setWill(mtopic, "close".getBytes(), 0, true);
sampleClient.connect(connOpts);
int[] Qos = {0};
String[] topic1 = {config.getTopic()};
sampleClient.subscribe(topic1, Qos);
}
这与mosquitto无关,它是您正在使用的客户端。看到这个答案:Send many publish message: Too many publishes in progress Error
让我说明一下,如何自己解决这些问题 -
打开Paho客户端的Github网页,搜索“正在进行的过多发布”:
这将引导您转到“32202”字符串,再次搜索它。
这导致您(在跳过一些本地化文件之后)到Java源代码文件MqttException.java中的常量:
public static final short REASON_CODE_MAX_INFLIGHT = 32202;
再次,搜索“REASON_CODE_MAX_INFLIGHT” - 最终将您带到文件ClientState.java:
if (actualInFlight >= this.maxInflight) {
//@TRACE 613= sending {0} msgs at max inflight window
log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});
throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
}
所以你需要调整maxInflight
属性。
几乎没有更多的搜索,你发现它可以通过调用你传递给Paho客户端的setMaxInflight(int maxInflight)
方法的MqttConnectionOptions对象上的connect
方法来设置。