我在MQTT新手。
我在Java中实现MQTT,我使用下面的代码发布者发布到particualr话题,
public void publish()
{
MqttClient myClient = null;
MqttConnectOptions connOpt;
try {
// Subscription with Brokers
connOpt = new MqttConnectOptions();
connOpt.setAutomaticReconnect(true);
connOpt.setCleanSession(true);//if your not setting cleanSession to false then subscriptions shouldn't be persisted.
String clientID = UUID.randomUUID().toString().replace("-", "");
System.out.println("clientID " + clientID);
myClient = new MqttClient("tcp://192.168.10.500:1883", clientID);
myClient.connect(connOpt);
String myTopic = "Device1";
MqttTopic topic = myClient.getTopic(myTopic);
int pubQoS = 0;
MqttMessage message = new MqttMessage("mqttMessage".getBytes());
message.setQos(pubQoS);
message.setRetained(false);
MqttDeliveryToken token = null;
token = topic.publish(message);
System.out.println("publish successful with the message :: " + message);
// Wait until the message has been delivered to the broker
token.waitForCompletion();
} catch (MqttException me) {
} catch (Exception e) {
}
}
然后我使用下面的代码读取所发布的消息对特定主题的用户,
public void subscribe()
{
try {
MqttConnectOptions connOpt;
// Subscription with mqttBrokerEndPoint
connOpt = new MqttConnectOptions();
connOpt.setAutomaticReconnect(true);
connOpt.setCleanSession(true);//if your not setting cleanSession to false then subscriptions shouldn't be persisted.
String clientID = UUID.randomUUID().toString().replace("-", "");
System.out.println("clientID " + clientID);
MqttSubscriber mqttConnection = new MqttSubscriber();
myClient = new MqttClient("tcp://192.168.10.500:1883" clientID);
myClient.setCallback(mqttConnection);
myClient.connect(connOpt);
myClient.subscribe("Device1");
} catch (MqttException e) {
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
try {
System.out.println(message);
boolean isValidClient = true;// Here i need to check if this is the valid subscriber for the message publised on topic "Device1"
//if(isValidClient) {
if(message != null) {
System.out.println("message" + message.toString());
}
myClient.unsubscribe("Device1");
myClient.disconnect();
//}
}
catch(Exception e){}
}
上述执行工作正常,因为它是。
因为我是很新的MQTT我有些怀疑上述实施。
1)在两个出版商和用户客户端ID应该是同为一个特定的流程?
或者应能在两个发布者和订阅不同如上:可随机产生?
String clientID = UUID.randomUUID().toString().replace("-", "");
这种随机生成的clientID的是工作的罚款既订阅和发布。
但是,如果我使用相同客户同时发布者和用户,并验证用户?
我的意思是使用subsriber“clientID的” myClient = new MqttClient(mqttBrokerEndPoint, "clientID");
和出版商则相同的“clientID的” myClient = new MqttClient(mqttBrokerEndPoint, "clientID");
我得到的MQTT经纪人控制台下面的套接字错误(使用Windows版),
1549414715: Socket error on client 82, disconnecting.
1549414715: New client connected from 192.168.10.500 as clientID (c1, k60).
1549414715: No will message specified.
1549414715: Sending CONNACK to 82 (0, 0)
1549414716: New connection from 192.168.10.500 on port 1883.
1549414716: Client 82 already connected, closing old connection.
1549414716: Socket error on client 82, disconnecting.
1549414716: New client connected from 192.168.10.500 as clientID (c1, k60).
1549414716: No will message specified.
1549414716: Sending CONNACK to 82 (0, 0)
1549414716: New connection from 192.168.10.500 on port 1883.
1549414716: Client 82 already connected, closing old connection.
1549414716: Socket error on client 82, disconnecting.
1549414716: New client connected from 192.168.10.500 as clientID (c1, k60).
1549414716: No will message specified.
1549414716: Sending CONNACK to 82 (0, 0)
和上述程序无法正常工作。
我们不能用同样的clientID
为双方用户和出版商?为什么会导致套接字错误和程序不工作?
2)是用户名和密码强制执行时?或者,我们可以建立不低于性质的2的连接?
connOpt.setUserName(USERNAME);
connOpt.setPassword(PASSWORD.toCharArray());
3)是pubQoS被强制用于出版商?我目前正在使用它作为零“0”?
MqttMessage message = new MqttMessage(mqttMessage.getBytes());
message.setQos(0);
message.setRetained(false);
还保留属性是强制性的发行商?
4)这2进行属性必需的,而订阅?我使用如下代码。
connOpt.setAutomaticReconnect(true);
connOpt.setCleanSession(true);//if your not setting cleanSession to
为假,则订阅不应该被坚持。
5)另外,一旦消息从MQTT出版者接收到MessageArrived回调如下,如何验证,如果这是有效的订户,并进一步逻辑继续?
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
try {
System.out.println(message);
boolean isValidClient = true;// Here i need to check if this is the valid subscriber for the message publised on topic "Device1"
//if valid subscriber only i need to read message publised on the topic "Device1"
**//if(isValidClient) {**
if(message != null) {
System.out.println("message" + message.toString());
}
myClient.unsubscribe("Device1");
myClient.disconnect();
//}
}
catch(Exception e){}
我的意思是哪个属性MQTT API,以检查该消息仅此用户发送他可以继续进一步使用到达回调消息接收到的消息?
即,可使用的API MQTT的属性,以检查是否所接收的消息是与当前的过程/步骤(在订户方案如下)
是主题为用户和发行商之间的唯一共同的属性,以验证用户在messageArrived回调?还是我们有其他共同的属性,以检查用户和出版商之间的有效合同?
或者我们应该用clientID的验证用户?但是,如果使用同一个客户端的用户和出版商我得到我在点数1所提到的套接字错误。
如何进一步开展有关这个?
1)您不能使用相同的客户端ID两个MQTT客户机。对MQTT客户机ID必须是唯一的。您可以处理不同的业务创建不同的主题。您可以参考下面的答案
Two paho.mqtt clients subscribing to the same client localy
2)用户名和口令是可选的。如果您的MQTT服务器配置为使用用户名和密码,然后你的客户端也需要通过用户名和密码MQTT服务器。请参阅下面的链接了解码使用
How can i connect a Java mqtt client with username and password to an emqttd(EMQ) broker?
如果你想为TLS先进水平GO / SSL安全
https://dzone.com/articles/secure-communication-with-tls-and-the-mosquitto-broker
3)QOS - 的服务质量(QoS)水平的质量是一个消息的发送者和限定递送的特定消息的保证的消息的接收器之间的协议。请参阅下面的链接,QoS的更多信息
https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
4)自动重新连接(true) - 如果MQTT服务器在运行时断开连接,然后客户端尝试重新连接到服务器。
干净的会话(真) - 在重新连接到MQTT服务器具有相同的客户端ID相关联的所有旧的会话将被删除。
干净的会话(false) - 在重新连接到MQTT服务器具有相同的客户端ID相关联的所有旧的会话将被保留。在某些情况下MQTT服务器保留基于QoS级别的消息。
5)试图创建多个操作多个主题。在消息到达方法可以检查哪个主题消息已经到来。然后,你可以调用基于不同的方法。下面代码的方式之一。你可以找到适合您的需要最好的方式。干杯!!!!!
@Override
public void deliveryComplete(IMqttDeliveryToken token)
{
try
{
if(token.getTopics()[0].equals("TOPIC_A"))
{
//Do something
}
else if(token.getTopics()[0].equals("TOPIC_B"))
{
//Do something
}
}
catch (Exception e)
{
MQTTLogger.logSevere(e);
}
}
JSON格式的数据
第一条消息
{
"client-name":"client1"
"data":""
}
第二条消息
{
"client-name":"client2"
"data":""
}