设备断开连接时,尝试重新连接,但是所有mqtt连接正常,但是有一个现象,就是不进行mqtt订阅。
请帮助我。
当mqtt断开连接时,我想要尝试重新连接并使用mqtt connect然后mqtt subscribe的结果。
var cluster = require("cluster")
var mqtt = require("mqtt")
var net = require("net")
var tls = require("tls")
var mf = require("../messages")
//require('log-timestamp')
const EventEmitter = require("events")
var mqtt_host = process.env.THINGPLUG_HOST1 || 'localhost'
//var mqtt_host = process.env.THINGPLUG_HOST2 || '192.168.7.131'
var http_host = process.env.THINGPLUG_HTTP_HOST || mqtt_host
var checked_mqtt_host = mqtt_host
var http_port = 9000
var mqtt_port = 8883
var numConn = 0
var numReconnected = 0
var inReconnectDeviceCount = 0
var sendReqCount = 0
var recvReqCount = 0
var resCount = 0
var inQueue = 0
var portIdx = 10000
console.log = function() {
}
class Device {
constructor( serviceName, deviceName, deviceToken, mdn , dir ,connectAfter, telemetryInterval, localIp, deadInterval, reconnectInterval) {
this.requestTopic = "v1/dev/" + serviceName + "/" + deviceName + "/up"
this.responseTopic = "v1/dev/" + serviceName + "/" + deviceName + "/down"
this.telemetryTopic = "v1/dev/" + serviceName + "/" + deviceName + "/telemetry/csv"
this.deviceToken = deviceToken;
this.mdn = mdn;
this.dir = dir;
this.telemetryInterval = telemetryInterval;
this.connectAfter = connectAfter;
this.localIp = localIp;
this.connected = false;
this.deadInterval = deadInterval
this.isDead = false;
this.reconnectInterval = reconnectInterval;
this.isRecon = false;
//console.log("new Device token =%s ,mdn = %s, dir =%s, interval = %s", this.deviceToken, this.mdn , this.dir , this.telemetryInterval);
}
connect() {
var self = this;
if(self.isRecon == true) {
self.client = null
}
if(self.client == null) {
//console.log("connect. username = %s , clientId = %s", this.deviceToken, this.deviceToken)
var option = {
username: this.deviceToken,
clean: true,
clientId: this.deviceToken,
keepalive : 120,
protocol: 'mqtts',
rejectUnauthorized: false
}
self.client = new mqtt.MqttClient( (client) => {
var sp = mqtt_host.split(":")
var random = Math.floor(Math.random() * sp.length)
var host = sp[random]
//console.error("mqtt_host %s %s %s", mqtt_host, host, random)
var socketOption = {
rejectUnauthorized: false,
host : host ,
port : mqtt_port
}
if(self.localIp != null) {
socketOption.localAddress = self.localIp;
socketOption.localPort = portIdx;
portIdx++
}
var ret= tls.connect ( socketOption , () => {
})
ret.on("error", (err) => {
this.disconnect()
if(err.code != 'ECONNREFUSED' && err.code != 'ETIMEDOUT' && err.code != 'ECONNRESET') {
console.error("ret errror = " + err + " " + self.responseTopic)
}
})
return ret;
} , option )
self.client.subscribe(self.responseTopic);
self.client.on("error", (err) => {
console.error("on errror = " + err + " " + self.responseTopic)
if(self.connected == true) {
self.connected = false;
numConn--
}
})
self.client.on("close", (err) => {
if(self.connected == true) {
self.connected = false;
numConn--
this.disconnect()
}
})
self.client.on('connect', function () {
self.client.subscribe(self.responseTopic);
if(self.isRecon == true) {
self.isRecon = false;
inReconnectDeviceCount--;
numReconnected++;
}
numConn++
//console.log("Client connected! username = %s , clientId = %s", self.deviceToken, self.deviceToken)
self.connected = true;
self.isDead = false;
if(self.deadInterval > 0 ) {
setTimeout( () => {
self.makeDead()
}, Math.random() * self.deadInterval )
}
});
self.client.on("message", function (topic, message) {
var parsed = JSON.parse(message.toString())
if(parsed.cmd == "jsonRpc") {
if(parsed.rpcReq.method == "tp_adt_reqsenddata") {
recvReqCount++
}
else {
resCount++
inQueue--
if(parsed.rpcReq.params.rc != "000") {
console.error("receive error message topic = %s , message = %s", topic, message.toString());
}
}
var res = {
cmd : "jsonRpc" ,
cmdId : parsed.cmdId ,
serviceName : parsed.serviceName ,
deviceName : parsed.deviceName ,
result : "",
rpcRsp : {
jsonrpc : "2.0" ,
result : {
mid : parsed.rpcReq.params.mid ,
mdn : parsed.rpcReq.params.mdn,
pos : parsed.rpcReq.params.pos ,
rc : "000",
pld : Buffer.from(`This is response from ${self.mdn}`).toString("base64")
} ,
id : parsed.rpcReq.id
}
}
self.client.publish(self.requestTopic , JSON.stringify(res), { qos : 0})
}
//client.end(false);
});
}
}
disconnect () {
var self = this;
self.connected = false;
self.isDead = true;
if(self.isRecon != true) {
self.isRecon = true;
inReconnectDeviceCount++
}
self.client.end();
if(self.reconnectInterval > 0) {
self.reconnect();
}
}
reconnect () {
var self = this;
setTimeout( () => {
self.connect();
}, Math.round(Math.random() * self.reconnectInterval))
}
makeDead () {
var self = this;
var disconnectedTime = new Date().toString()
console.error("mdn %s is Dead at %s" , this.mdn, disconnectedTime)
self.isDead = true
self.client.end(false)
setTimeout( () => {
console.error("It's been 5mins. Check Heart Beat Error Noti for mdn %s Now. current = %s, disconnected = %s" , self.mdn, new Date().toString(), disconnectedTime)
self.connect();
}, 5 * 60 * 1000 )
setTimeout( () => {
self.isDead = false
console.error("mdn %s is Alive at %s" , self.mdn, new Date().toString())
self.connect();
}, 10 * 60 * 1000 )
}
sendTelemetry() {
var self = this;
if(self.connected == true && self.isDead == false) {
self.connect();
if(self.connected == true) {
var payload = "JfH5+ED5+fjwQPH4QNnz9PBA8fBAw/Dw8EDw8fLy9/Xw8Pb09EDw8PIN"
//for(var i = 0; i < (30 + Math.floor(Math.random()*20)); i++) {
// payload = payload + Math.random().toString(36).substr(2,1)
//}
//payload = Buffer.from(payload).toString('base64')
var mid = mf.generateTimestamp()
var csvTelemetry = mid + "," + self.mdn + "," + self.dir + "," + payload + ",,,,,,,,,,,"
//var payload = Buffer.from(`mdn = ${self.mdn}`).toString('base64')
// var telemetry = {
// "mid": mf.generateTimestamp(),
// "mdn": self.mdn,
// "dir" : self.dir,
// "pld": payload
// };
sendReqCount++
inQueue++
self.client.publish(self.telemetryTopic, csvTelemetry, { qos: 0 })
}
}
setTimeout( () => {
self.sendTelemetry()
}, self.telemetryInterval )
}
run() {
var self = this;
if(self.connectAfter> 0) {
console.log("wait %s", self.connectAfter)
setTimeout( () => {
self.connect();
}, self.connectAfter )
} else {
self.connect()
}
if(self.telemetryInterval > 0) {
var wait = Math.random() * self.telemetryInterval;
setTimeout( () => {
self.sendTelemetry()
}, wait)
}
}
}
module.exports = function(settings) {
var services = settings.services;
if(cluster.isMaster) {
services.forEach( (v, idx) => {
var worker = cluster.fork();
var localIp = null;
if(settings.ip_addrs!=null) {
localIp = settings.ip_addrs[idx]
}
worker.send( {
settings : settings,
service : v ,
localIp : localIp
} )
})
} else {
process.on("message", (msg) => {
var numDevice = msg.settings.numDevice;
var startIdx = msg.settings.startIdx;
var telemetryInterval = msg.settings.telemetryInterval * 1000;
var connectCompleteTime = msg.settings.connectCompleteTime * 1000;
var deadInterval = 0
if(msg.settings.deadPer != null && msg.settings.deadPer > 0) {
deadInterval = numDevice * msg.settings.deadPer * 1000
}
var reconnectInterval = 0
if(msg.settings.reconnectInterval != null) {
reconnectInterval = msg.settings.reconnectInterval * 1000;
}
var json = require(msg.service)
setInterval( () => {
console.error("%s : numConn = %s(%s) , send req/s = %s, res/s = %s, inQueue = %s , recv req/s = %s, inRec = %s", json.serviceName, numConn, numDevice, sendReqCount/2, resCount/2, inQueue , recvReqCount/2, inReconnectDeviceCount)
sendReqCount = 0
resCount = 0
recvReqCount = 0
} , 2000)
for(var i=startIdx;i<(startIdx+numDevice);i++) {
var deviceName = json.serviceName + "Dev" + i;
var deviceToken = json.serviceName + "Token" + i;
var mdnPrefixLength = json.mdnPrefix.length
var mdn = json.mdnPrefix + String(i).padStart(11 - mdnPrefixLength , '0')
var randomWait = Math.round(Math.random() * connectCompleteTime )
var d = new Device(json.serviceName, deviceName, deviceToken, mdn, "7" , randomWait, telemetryInterval , msg.localIp, deadInterval, reconnectInterval )
d.run()
}
})
}
}
if ( require.main == module ) {
if (process.argv.length < 3) {
console.log("Usage : node %s settings.json", process.argv[1]);
return;
}
var settings = require(process.argv[2])
module.exports(settings);
}
请帮助我。