.net 客户端不断连接和断开连接

问题描述 投票:0回答:0

我在 VS2019 中运行一个 Windows 服务项目,带有

Mqtt 4.1.4 .Net
框架客户端。

我的 .net 客户端一遍又一遍地保持

Connecting and Disconnecting
.

我最近发现我的

OnSubscriberDisconnected
方法传递了以下
args
值:

args.Reason = SessionTakenOver

args.ReasonString = "Another client connected with the same client id."

最初我一直在每个 HiveMq Broker 连接(免费云版本)上创建一个新的随机 ClientID,但我将其更改为:

 clientId = ".netWinSvc-" + this.machineName;

这样运行我的 Win 服务代码的机器将始终使用相同的 ClientID 连接。

我相信我堆积了很多新的 ClientID 连接,并要求代理保持会话 (

CleanSession = false
)。并且免费的云订阅允许
100 device connections
.

问题是: 我该怎么做才能清理所有这些 clientID 连接,以及如何避免此断开/重新连接问题?将相同的 ClientID 与

CleanSession = false
一起使用是最好的方法吗?换句话说,我不应该要求经纪人
persist
我的 ClientID 连接吗?

这是我的 .Net 窗口服务代码的重要部分:

using log4net.Ext.EventID;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Security.Cryptography.X509Certificates;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Timer = System.Timers.Timer;
using MqttNotificationService.Models;
using Newtonsoft.Json;

namespace MqttNotificationService
  {
    public partial class MqttService : ServiceBase
    {
        public static readonly IEventIDLog applog = EventIDLogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

        private IManagedMqttClient managedMqttClientPublisher;
        private IManagedMqttClient managedMqttClientSubscriber;
        private string mqttClientUser = "";
        private byte[] mqttClientPswd;

        private string mqttBrokerAddress;
        private string mqttProtocol = "wss";
        private int? mqttPort;
        private string defaultMessage;

        private string topicThisHost = "";      // get topic from app.config
        private string heartBeatPubMsg;
        private double heartBeatTimerMs;
        
    public MqttService()
    {
        InitializeComponent();
    }

    protected override void OnStart(string[] args)
    {
        Init();
        CreateThreadAndRun();
        StartHeartBeatTimer();
    }

      private void Init()
      {
        log4net.Config.XmlConfigurator.Configure();
        mqttClientUser = ConfigurationManager.AppSettings["MqttClientUser"];
        mqttBrokerAddress = ConfigurationManager.AppSettings["MqttBrokerAddress"];
        mqttProtocol = ConfigurationManager.AppSettings["MqttProtocol"];
        mqttPort = Int16.Parse(ConfigurationManager.AppSettings["MqttPort"]);
        MqttUseTls = bool.Parse(ConfigurationManager.AppSettings["UseTlsCertificate"]);
        var MqttQos = Int16.Parse(ConfigurationManager.AppSettings["QualityOfService"]);
        mqttRetained = bool.Parse(ConfigurationManager.AppSettings["MqttRetained"]);
        mqttLastWillRetained = bool.Parse(ConfigurationManager.AppSettings["MqttLastWillRetained"]);
        mqttLastWillMessage = ConfigurationManager.AppSettings["MqttLastWillMessage"];
        mqttKeepAliveSeconds = Int16.Parse(ConfigurationManager.AppSettings["MqttLastWillKeepAliveSeconds"]);
        CertificateFileName = ConfigurationManager.AppSettings["CertificateFileName"];
        CertificatePwd = ConfigurationManager.AppSettings["CertificatePswd"];

        defaultMessage = ConfigurationManager.AppSettings["DefaultPubMessage"];
        topicSubFromHar = ConfigurationManager.AppSettings["MqttTopicSubFromHar"];
        topicThisHost = ConfigurationManager.AppSettings["MqttTopicThisHost"];
        heartBeatPubMsg = ConfigurationManager.AppSettings["HeartBeatPubMessage"];
        heartBeatTimerMs = Double.Parse(ConfigurationManager.AppSettings["HeartBeatTimerMs"]);
        pingDicom = bool.Parse(ConfigurationManager.AppSettings["CheckDicomServers"]);
        SynergyHostName = ConfigurationManager.AppSettings["SynergyHostName"];

        machineName = Dns.GetHostName();
        hostIp = Dns.GetHostEntry(machineName)
            .AddressList
            .FirstOrDefault(ip => ip.AddressFamily == AddressFamily.InterNetwork)
            .ToString();
       
        clientId = ".netWinSvc-" + this.machineName;

        QosThisHost = MqttQualityOfServiceLevel.AtLeastOnce;
        switch (MqttQos)
        {
            case 0:
                QosThisHost = MqttQualityOfServiceLevel.AtLeastOnce;
                break;
            case 1:
                QosThisHost = MqttQualityOfServiceLevel.AtMostOnce;
                break;
            case 2:
                QosThisHost = MqttQualityOfServiceLevel.ExactlyOnce;
                break;
        }
      }

      public void CreateThreadAndRun()
      {
          Thread m_Thread = new Thread(new ThreadStart(StartPublisherAndSubscriber));
          m_Thread.SetApartmentState(ApartmentState.STA);
          m_Thread.Name = "MT";
          m_Thread.Priority = ThreadPriority.Highest;
          m_Thread.Start();
      }

      private void StartPublisherAndSubscriber()
      {
          StartSubscriber();
          _ = StartPublisher();
          CheckOtherServers();
      }

      private void StartHeartBeatTimer()
      {
          TimeSpan ts = new TimeSpan(0, 0, 5);
          Thread.Sleep(ts);

          Timer timer = new Timer();
          timer.Elapsed += new ElapsedEventHandler(PublishHeartBeat);
          timer.Interval = heartBeatTimerMs;
          timer.Enabled = true;
      }

      private void PublishHeartBeat(object source, ElapsedEventArgs e)
      {
          var message = $"{ this.heartBeatPubMsg}: { MyHostName} {hostIp}";
          _ = this.Publish(message, topicThisHost);
          this.CheckOtherServers();
      }

       public async Task Publish(string messageIn, string topic, IManagedMqttClient pubClient = null)
      {
          MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce;
          switch (MqttQos)
          {
              case 0:
                  qos = MqttQualityOfServiceLevel.AtLeastOnce;
                  break;
              case 1:
                  qos = MqttQualityOfServiceLevel.AtMostOnce;
                  break;
              case 2:
                  qos = MqttQualityOfServiceLevel.ExactlyOnce;
                  break;
          }
          MqttModel message = new MqttModel();
          message.message = messageIn;
          message.datestamp = DateTime.Now;
          message.source = "";
          message.status = "";
          var payload = JsonConvert.SerializeObject(message, Formatting.Indented);

          var send = new MqttApplicationMessageBuilder()
              .WithTopic(topic)
              .WithPayload(payload)
              .WithQualityOfServiceLevel(qos)
              .WithRetainFlag(false)
              .Build();

          if (this.managedMqttClientPublisher == null)
          {
              this.managedMqttClientPublisher = pubClient;
          }

          if (this.managedMqttClientPublisher != null)
          {
              try
              {
                  applog.Debug($"Mqtt Service Publish() method - about to pub mqtt message EnqueueAsync() - {messageIn} / {topic} ");
                  await this.managedMqttClientPublisher.EnqueueAsync(send);
                  MonitoringLogs logs = new MonitoringLogs();
                  logs.InsertIntoLog(message);
              }
              catch (Exception ex)
              {
                  string errorMessage = $"Exception occured in Publish() method. {ex.Message}";
                  applog.Error(errorMessage);
                  throw new Exception(errorMessage);
              }
          }
          else
          {
              applog.Info($"Mqtt Service Publish() method - managedMqttClientPublisher object appears to be NULL");
          }
      }
  
  public ManagedMqttClientOptions WsSecureClientOptions()
        {
            string assemblyPath = Path.GetDirectoryName(Assembly.GetAssembly(typeof(MqttService)).CodeBase);

            // Building out the secure wss url (both pfx/crt certificate file types appear to work here)
            var url = $"{mqttBrokerAddress}:{mqttPort}/mqtt";

            X509Certificate2 x509Cert = null;
            var file = CertificateFileName;
            var filePath = Path.Combine(assemblyPath, file).Remove(0, 6);

            // pfx file contains both pub and priv keys (needs pswd); crt file only has pub key (no pswd req'd)
            if (Path.GetExtension(CertificateFileName.ToLower()) == ".pfx")
            {
                // using a PFX cert file via the X509 class
                x509Cert = new X509Certificate2(filePath, CertificatePwd);
            }
            else if (Path.GetExtension(CertificateFileName.ToLower()) == ".crt")
            {
                x509Cert = new X509Certificate2(filePath);
            }

            applog.Debug($"In WsSecureClientOptions(), Certificate Path - {filePath}");          

            var clientOptionsBldr = new MqttClientOptionsBuilder()
                                        .WithProtocolVersion(MqttProtocolVersion.V500)
                                        .WithWebSocketServer(url)
                                        .WithCredentials(mqttClientUser, mqttClientPswd)
                                        .WithClientId(clientId)
                                        .WithCleanSession()
                                        .WithCredentials(mqttClientUser, mqttClientPswd)
                                        .WithTls(
                                            new MqttClientOptionsBuilderTlsParameters()
                                            {
                                                UseTls = true,
                                                SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
                                                Certificates = new List<X509Certificate2>() { x509Cert }                                              
                                            });
            ManagedMqttClientOptions managedClientOptions = null;
            try
            {
                applog.Debug($"In WsSecureClientOptions(), about to Build Publisher - ${url}");
                managedClientOptions = new ManagedMqttClientOptionsBuilder()
                                                                .WithClientOptions(clientOptionsBldr)
                                                                .Build();
            }
            catch (Exception ex)
            {
                applog.Error("CERT ERROR ! Exception in WsSecureClientOptions() " + ex.Message);
            }

            return managedClientOptions;
        }

    private Task OnSubscriberConnected(MqttClientConnectedEventArgs _)
    {
        return Task.CompletedTask;
    }

    private Task OnSubscriberDisconnected(MqttClientDisconnectedEventArgs _)
    {
        return Task.CompletedTask;
    }
 }

mqtt hivemq mqtt.js
© www.soinside.com 2019 - 2024. All rights reserved.