当从 node.js 源重新连接 mqtt 连接时,mqtt 订阅不会发生

问题描述 投票:0回答:0

设备断开连接时,尝试重新连接,但是所有mqtt连接正常,但是有一个现象,就是不进行mqtt订阅。

请帮助我。

当mqtt断开连接时,我想要尝试重新连接并使用mqtt connect然后mqtt subscribe的结果。

var cluster = require("cluster")
var mqtt = require("mqtt")
var net = require("net")
var tls = require("tls")
var mf = require("../messages")
//require('log-timestamp')

const EventEmitter = require("events")

var mqtt_host = process.env.THINGPLUG_HOST1 || 'localhost'
//var mqtt_host = process.env.THINGPLUG_HOST2 || '192.168.7.131'
var http_host = process.env.THINGPLUG_HTTP_HOST || mqtt_host
var checked_mqtt_host = mqtt_host

var http_port = 9000
var mqtt_port = 8883


var numConn = 0
var numReconnected = 0
var inReconnectDeviceCount = 0
var sendReqCount = 0
var recvReqCount = 0

var resCount = 0
var inQueue  = 0

var portIdx = 10000

console.log = function() {

}


class Device {
    constructor( serviceName, deviceName, deviceToken,  mdn , dir ,connectAfter, telemetryInterval,  localIp, deadInterval, reconnectInterval) {

        this.requestTopic = "v1/dev/" + serviceName + "/" + deviceName + "/up"
        this.responseTopic = "v1/dev/" + serviceName + "/" + deviceName + "/down"
        this.telemetryTopic = "v1/dev/" + serviceName + "/" + deviceName + "/telemetry/csv"

        this.deviceToken = deviceToken;
        this.mdn = mdn;
        this.dir = dir;
        this.telemetryInterval = telemetryInterval;
        this.connectAfter = connectAfter;
        this.localIp = localIp;
        this.connected = false;
        this.deadInterval = deadInterval

        this.isDead = false;
        this.reconnectInterval = reconnectInterval;
        this.isRecon = false;

        //console.log("new Device token =%s ,mdn = %s, dir =%s, interval = %s", this.deviceToken, this.mdn , this.dir , this.telemetryInterval);
    }

    connect() {

        var self = this;
        if(self.isRecon == true) {
            self.client = null
        }
        if(self.client == null) {
            //console.log("connect. username = %s , clientId = %s", this.deviceToken, this.deviceToken)

            var option = {
                username: this.deviceToken,
                clean: true,
                clientId: this.deviceToken,
                keepalive : 120,
                protocol: 'mqtts',
                rejectUnauthorized: false
            }
            self.client = new mqtt.MqttClient( (client) => {

                var sp = mqtt_host.split(":")
                var random = Math.floor(Math.random() * sp.length)
                var host = sp[random]
                //console.error("mqtt_host %s %s %s", mqtt_host, host, random)

                var socketOption = {
                    rejectUnauthorized: false,
                    host : host ,
                    port : mqtt_port
                }

                if(self.localIp != null) {
                    socketOption.localAddress = self.localIp;
                    socketOption.localPort = portIdx;
                    portIdx++
                }

                var ret= tls.connect ( socketOption , () => {
                })
                ret.on("error", (err) => {
                    this.disconnect()

                    if(err.code != 'ECONNREFUSED' &&  err.code != 'ETIMEDOUT' && err.code != 'ECONNRESET') {
                        console.error("ret errror = " +  err + "  " + self.responseTopic)
                    }
                })
                return ret;
            } , option )

            self.client.subscribe(self.responseTopic);

            self.client.on("error", (err) => {
                console.error("on errror = " +  err + "  " + self.responseTopic)
                if(self.connected == true) {
                    self.connected = false;
                    numConn--
                }
            })

            self.client.on("close", (err) => {
                if(self.connected == true) {
                    self.connected = false;
                    numConn--
                    this.disconnect()
                }
            })

            self.client.on('connect', function () {
                self.client.subscribe(self.responseTopic);
                if(self.isRecon == true) {
                    self.isRecon = false;
                    inReconnectDeviceCount--;
                    numReconnected++;
                }
                numConn++
                //console.log("Client connected! username = %s , clientId = %s", self.deviceToken, self.deviceToken)
              self.connected = true;
              self.isDead = false;

              if(self.deadInterval > 0 ) {
                 setTimeout( () => {
                     self.makeDead()
                 }, Math.random() * self.deadInterval )
              }
            });

            self.client.on("message", function (topic, message) {


                var parsed = JSON.parse(message.toString())
                if(parsed.cmd == "jsonRpc") {
                    if(parsed.rpcReq.method == "tp_adt_reqsenddata") {
                        recvReqCount++

                    }
                    else {
                        resCount++
                        inQueue--

                        if(parsed.rpcReq.params.rc != "000") {
                            console.error("receive error message topic = %s , message = %s", topic, message.toString());
                        }

                    }
                    var res = {
                        cmd : "jsonRpc" ,
                        cmdId : parsed.cmdId ,
                        serviceName : parsed.serviceName ,
                        deviceName : parsed.deviceName ,
                        result : "",
                        rpcRsp : {
                            jsonrpc : "2.0" ,
                            result : {
                                mid : parsed.rpcReq.params.mid ,
                                mdn : parsed.rpcReq.params.mdn,
                                pos : parsed.rpcReq.params.pos ,
                                rc : "000",
                                pld : Buffer.from(`This is response from ${self.mdn}`).toString("base64")
                            } ,
                            id : parsed.rpcReq.id
                        }
                    }
                    self.client.publish(self.requestTopic , JSON.stringify(res), { qos : 0})
                }
                //client.end(false);
            });
        }

    }

    disconnect () {
        var self = this;
        self.connected = false;
        self.isDead = true;
        if(self.isRecon != true) {
            self.isRecon = true;
            inReconnectDeviceCount++
        }
        self.client.end();
        if(self.reconnectInterval > 0) {
            self.reconnect();
        }
    }

    reconnect () {
        var self = this;
        setTimeout( () => {
            self.connect();
        },  Math.round(Math.random() * self.reconnectInterval))
    }

    makeDead () {
        var self = this;

        var disconnectedTime = new Date().toString()
        console.error("mdn %s is Dead at %s" , this.mdn, disconnectedTime)
        self.isDead = true
        self.client.end(false)
        setTimeout( () => {

            console.error("It's been 5mins. Check Heart Beat Error Noti for mdn %s Now.  current = %s, disconnected = %s" , self.mdn, new Date().toString(), disconnectedTime)

            self.connect();
        }, 5 * 60 * 1000 )

        setTimeout( () => {
            self.isDead = false
            console.error("mdn %s is Alive at %s" , self.mdn, new Date().toString())

            self.connect();
        }, 10 * 60 * 1000 )
    }

    sendTelemetry() {

        var self = this;
        if(self.connected == true && self.isDead == false) {
            self.connect();
            if(self.connected == true) {
                var payload = "JfH5+ED5+fjwQPH4QNnz9PBA8fBAw/Dw8EDw8fLy9/Xw8Pb09EDw8PIN"
                //for(var i = 0; i < (30 + Math.floor(Math.random()*20)); i++) {
                //    payload = payload + Math.random().toString(36).substr(2,1)
                //}
                //payload = Buffer.from(payload).toString('base64')

                var mid = mf.generateTimestamp()
                var csvTelemetry = mid + "," + self.mdn + "," + self.dir + "," + payload + ",,,,,,,,,,,"

                //var payload = Buffer.from(`mdn = ${self.mdn}`).toString('base64')
                // var telemetry = {
                //     "mid": mf.generateTimestamp(),
                //     "mdn": self.mdn,
                //     "dir" : self.dir,
                //     "pld": payload
                // };

                sendReqCount++
                inQueue++

                self.client.publish(self.telemetryTopic, csvTelemetry, { qos: 0 })
            }
        }

        setTimeout( () => {
            self.sendTelemetry()
        }, self.telemetryInterval )
    }

    run() {
        var self = this;
        if(self.connectAfter> 0) {
            console.log("wait %s", self.connectAfter)
            setTimeout( () => {
                self.connect();


            }, self.connectAfter )
        } else {
            self.connect()
        }

        if(self.telemetryInterval > 0) {
            var wait = Math.random() * self.telemetryInterval;
            setTimeout( () => {
                self.sendTelemetry()
            }, wait)

        }

    }
}

module.exports = function(settings) {
    var services = settings.services;

    if(cluster.isMaster) {
        services.forEach( (v, idx) => {
            var worker = cluster.fork();
            var localIp = null;
            if(settings.ip_addrs!=null) {
                localIp = settings.ip_addrs[idx]
            }
            worker.send( {
                settings : settings,
                service : v ,
                localIp : localIp
            } )
        })
    } else {


        process.on("message", (msg) => {




            var numDevice = msg.settings.numDevice;
            var startIdx = msg.settings.startIdx;
            var telemetryInterval = msg.settings.telemetryInterval * 1000;
            var connectCompleteTime = msg.settings.connectCompleteTime * 1000;

            var deadInterval = 0
            if(msg.settings.deadPer != null && msg.settings.deadPer > 0) {
                deadInterval = numDevice * msg.settings.deadPer * 1000
            }

            var reconnectInterval = 0
            if(msg.settings.reconnectInterval != null) {
                reconnectInterval = msg.settings.reconnectInterval * 1000;
            }

            var json = require(msg.service)
            setInterval( () => {
                console.error("%s : numConn = %s(%s) , send req/s = %s,  res/s = %s, inQueue = %s , recv req/s = %s, inRec = %s", json.serviceName, numConn, numDevice, sendReqCount/2, resCount/2, inQueue , recvReqCount/2, inReconnectDeviceCount)
                sendReqCount = 0
                resCount = 0
                recvReqCount = 0
            } , 2000)

            for(var i=startIdx;i<(startIdx+numDevice);i++) {
                var deviceName = json.serviceName + "Dev" + i;
                var deviceToken = json.serviceName + "Token" + i;

                var mdnPrefixLength = json.mdnPrefix.length

                var mdn = json.mdnPrefix +  String(i).padStart(11 - mdnPrefixLength , '0')

                var randomWait = Math.round(Math.random() * connectCompleteTime )

                var d = new Device(json.serviceName, deviceName, deviceToken, mdn, "7" , randomWait, telemetryInterval , msg.localIp, deadInterval, reconnectInterval )
                d.run()
            }

        })
    }
}

if ( require.main == module ) {

    if (process.argv.length < 3) {
        console.log("Usage : node %s settings.json", process.argv[1]);
        return;
    }

    var settings = require(process.argv[2])

    module.exports(settings);
}

请帮助我。

node.js deployment mqtt simulator
© www.soinside.com 2019 - 2024. All rights reserved.