我在 VS2019 中运行一个 Windows 服务项目,带有
Mqtt 4.1.4 .Net
框架客户端。
我的 .net 客户端一遍又一遍地保持
Connecting and Disconnecting
.
我最近发现我的
OnSubscriberDisconnected
方法传递了以下args
值:
args.Reason = SessionTakenOver
args.ReasonString = "Another client connected with the same client id."
最初我一直在每个 HiveMq Broker 连接(免费云版本)上创建一个新的随机 ClientID,但我将其更改为:
clientId = ".netWinSvc-" + this.machineName;
这样运行我的 Win 服务代码的机器将始终使用相同的 ClientID 连接。
我相信我堆积了很多新的 ClientID 连接,并要求代理保持会话 (
CleanSession = false
)。并且免费的云订阅允许100 device connections
.
问题是: 我该怎么做才能清理所有这些 clientID 连接,以及如何避免此断开/重新连接问题?将相同的 ClientID 与
CleanSession = false
一起使用是最好的方法吗?换句话说,我不应该要求经纪人 persist
我的 ClientID 连接吗?
这是我的 .Net 窗口服务代码的重要部分:
using log4net.Ext.EventID;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Security.Cryptography.X509Certificates;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Timer = System.Timers.Timer;
using MqttNotificationService.Models;
using Newtonsoft.Json;
namespace MqttNotificationService
{
public partial class MqttService : ServiceBase
{
public static readonly IEventIDLog applog = EventIDLogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
private IManagedMqttClient managedMqttClientPublisher;
private IManagedMqttClient managedMqttClientSubscriber;
private string mqttClientUser = "";
private byte[] mqttClientPswd;
private string mqttBrokerAddress;
private string mqttProtocol = "wss";
private int? mqttPort;
private string defaultMessage;
private string topicThisHost = ""; // get topic from app.config
private string heartBeatPubMsg;
private double heartBeatTimerMs;
public MqttService()
{
InitializeComponent();
}
protected override void OnStart(string[] args)
{
Init();
CreateThreadAndRun();
StartHeartBeatTimer();
}
private void Init()
{
log4net.Config.XmlConfigurator.Configure();
mqttClientUser = ConfigurationManager.AppSettings["MqttClientUser"];
mqttBrokerAddress = ConfigurationManager.AppSettings["MqttBrokerAddress"];
mqttProtocol = ConfigurationManager.AppSettings["MqttProtocol"];
mqttPort = Int16.Parse(ConfigurationManager.AppSettings["MqttPort"]);
MqttUseTls = bool.Parse(ConfigurationManager.AppSettings["UseTlsCertificate"]);
var MqttQos = Int16.Parse(ConfigurationManager.AppSettings["QualityOfService"]);
mqttRetained = bool.Parse(ConfigurationManager.AppSettings["MqttRetained"]);
mqttLastWillRetained = bool.Parse(ConfigurationManager.AppSettings["MqttLastWillRetained"]);
mqttLastWillMessage = ConfigurationManager.AppSettings["MqttLastWillMessage"];
mqttKeepAliveSeconds = Int16.Parse(ConfigurationManager.AppSettings["MqttLastWillKeepAliveSeconds"]);
CertificateFileName = ConfigurationManager.AppSettings["CertificateFileName"];
CertificatePwd = ConfigurationManager.AppSettings["CertificatePswd"];
defaultMessage = ConfigurationManager.AppSettings["DefaultPubMessage"];
topicSubFromHar = ConfigurationManager.AppSettings["MqttTopicSubFromHar"];
topicThisHost = ConfigurationManager.AppSettings["MqttTopicThisHost"];
heartBeatPubMsg = ConfigurationManager.AppSettings["HeartBeatPubMessage"];
heartBeatTimerMs = Double.Parse(ConfigurationManager.AppSettings["HeartBeatTimerMs"]);
pingDicom = bool.Parse(ConfigurationManager.AppSettings["CheckDicomServers"]);
SynergyHostName = ConfigurationManager.AppSettings["SynergyHostName"];
machineName = Dns.GetHostName();
hostIp = Dns.GetHostEntry(machineName)
.AddressList
.FirstOrDefault(ip => ip.AddressFamily == AddressFamily.InterNetwork)
.ToString();
clientId = ".netWinSvc-" + this.machineName;
QosThisHost = MqttQualityOfServiceLevel.AtLeastOnce;
switch (MqttQos)
{
case 0:
QosThisHost = MqttQualityOfServiceLevel.AtLeastOnce;
break;
case 1:
QosThisHost = MqttQualityOfServiceLevel.AtMostOnce;
break;
case 2:
QosThisHost = MqttQualityOfServiceLevel.ExactlyOnce;
break;
}
}
public void CreateThreadAndRun()
{
Thread m_Thread = new Thread(new ThreadStart(StartPublisherAndSubscriber));
m_Thread.SetApartmentState(ApartmentState.STA);
m_Thread.Name = "MT";
m_Thread.Priority = ThreadPriority.Highest;
m_Thread.Start();
}
private void StartPublisherAndSubscriber()
{
StartSubscriber();
_ = StartPublisher();
CheckOtherServers();
}
private void StartHeartBeatTimer()
{
TimeSpan ts = new TimeSpan(0, 0, 5);
Thread.Sleep(ts);
Timer timer = new Timer();
timer.Elapsed += new ElapsedEventHandler(PublishHeartBeat);
timer.Interval = heartBeatTimerMs;
timer.Enabled = true;
}
private void PublishHeartBeat(object source, ElapsedEventArgs e)
{
var message = $"{ this.heartBeatPubMsg}: { MyHostName} {hostIp}";
_ = this.Publish(message, topicThisHost);
this.CheckOtherServers();
}
public async Task Publish(string messageIn, string topic, IManagedMqttClient pubClient = null)
{
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce;
switch (MqttQos)
{
case 0:
qos = MqttQualityOfServiceLevel.AtLeastOnce;
break;
case 1:
qos = MqttQualityOfServiceLevel.AtMostOnce;
break;
case 2:
qos = MqttQualityOfServiceLevel.ExactlyOnce;
break;
}
MqttModel message = new MqttModel();
message.message = messageIn;
message.datestamp = DateTime.Now;
message.source = "";
message.status = "";
var payload = JsonConvert.SerializeObject(message, Formatting.Indented);
var send = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.WithRetainFlag(false)
.Build();
if (this.managedMqttClientPublisher == null)
{
this.managedMqttClientPublisher = pubClient;
}
if (this.managedMqttClientPublisher != null)
{
try
{
applog.Debug($"Mqtt Service Publish() method - about to pub mqtt message EnqueueAsync() - {messageIn} / {topic} ");
await this.managedMqttClientPublisher.EnqueueAsync(send);
MonitoringLogs logs = new MonitoringLogs();
logs.InsertIntoLog(message);
}
catch (Exception ex)
{
string errorMessage = $"Exception occured in Publish() method. {ex.Message}";
applog.Error(errorMessage);
throw new Exception(errorMessage);
}
}
else
{
applog.Info($"Mqtt Service Publish() method - managedMqttClientPublisher object appears to be NULL");
}
}
public ManagedMqttClientOptions WsSecureClientOptions()
{
string assemblyPath = Path.GetDirectoryName(Assembly.GetAssembly(typeof(MqttService)).CodeBase);
// Building out the secure wss url (both pfx/crt certificate file types appear to work here)
var url = $"{mqttBrokerAddress}:{mqttPort}/mqtt";
X509Certificate2 x509Cert = null;
var file = CertificateFileName;
var filePath = Path.Combine(assemblyPath, file).Remove(0, 6);
// pfx file contains both pub and priv keys (needs pswd); crt file only has pub key (no pswd req'd)
if (Path.GetExtension(CertificateFileName.ToLower()) == ".pfx")
{
// using a PFX cert file via the X509 class
x509Cert = new X509Certificate2(filePath, CertificatePwd);
}
else if (Path.GetExtension(CertificateFileName.ToLower()) == ".crt")
{
x509Cert = new X509Certificate2(filePath);
}
applog.Debug($"In WsSecureClientOptions(), Certificate Path - {filePath}");
var clientOptionsBldr = new MqttClientOptionsBuilder()
.WithProtocolVersion(MqttProtocolVersion.V500)
.WithWebSocketServer(url)
.WithCredentials(mqttClientUser, mqttClientPswd)
.WithClientId(clientId)
.WithCleanSession()
.WithCredentials(mqttClientUser, mqttClientPswd)
.WithTls(
new MqttClientOptionsBuilderTlsParameters()
{
UseTls = true,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
Certificates = new List<X509Certificate2>() { x509Cert }
});
ManagedMqttClientOptions managedClientOptions = null;
try
{
applog.Debug($"In WsSecureClientOptions(), about to Build Publisher - ${url}");
managedClientOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(clientOptionsBldr)
.Build();
}
catch (Exception ex)
{
applog.Error("CERT ERROR ! Exception in WsSecureClientOptions() " + ex.Message);
}
return managedClientOptions;
}
private Task OnSubscriberConnected(MqttClientConnectedEventArgs _)
{
return Task.CompletedTask;
}
private Task OnSubscriberDisconnected(MqttClientDisconnectedEventArgs _)
{
return Task.CompletedTask;
}
}