tl;dr:如何避免在另一个线程上使用 MQTTnet 客户端时对其进行处置? 也许这适用于任何
IDisposable
,但在 ManagedMqttClient
的情况下,也存在类似 IsConnected
的调用在异步调用之前要担心。
注意:我们使用的是 MQTTnet v3.0.16。我愿意接受包括“升级到最新版本,然后使用方法 X”的答案
我继承了一个使用
ManagedMqttClient
的应用程序,并在用户更改代理设置时最初替换/处置了该客户端:
using MQTTnet;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Threading.Tasks;
internal class OriginalApproach
{
private IManagedMqttClient _mqttClient;
private static MqttFactory _factory;
public OriginalApproach()
{
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttClientDisconnectedEventArgs => OnDisconnect(MqttClientDisconnectedEventArgs));
}
//Called if the user changes settings that affect the way we connect
//to the broker.
public async void OnSettingsChange()
{
if (_mqttClient != null && _mqttClient.IsConnected)
{
StopAsync();
return;
}
//Disposal isn't the only thread safety issue
if (_mqttClient != null && _mqttClient.IsStarted)
{
await Reconnect(TimeSpan.FromSeconds(2));
}
}
public async void StopAsync()
{
if (_mqttClient != null)
{
await _mqttClient.StopAsync();
await Task.Delay(System.TimeSpan.FromSeconds(2));
}
}
public async void OnDisconnect(MqttClientDisconnectedEventArgs e)
{
await Reconnect(TimeSpan.FromSeconds(5));
}
public async Task Reconnect(TimeSpan delay)
{
StopAsync();
await Task.Delay(delay);
Connect();
}
public async void Connect()
{
await CreateManagedClient();
try
{
if (!_mqttClient.IsConnected && !_mqttClient.IsStarted)
{
StartAsync();
}
}
catch (MQTTnet.Exceptions.MqttCommunicationException ex) { /* ... */ }
catch (MQTTnet.Exceptions.MqttProtocolViolationException ex) { /* ... */ }
}
public async Task<bool> CreateManagedClient()
{
try
{
if (_mqttClient != null)
_mqttClient.Dispose();
_factory = new MqttFactory();
_mqttClient = _factory.CreateManagedMqttClient();
await Task.Delay(System.TimeSpan.FromSeconds(2));
}
catch (Exception e)
{
_mqttClient.Dispose();
_mqttClient = null;
return false;
}
return true;
}
public async void StartAsync()
{
MqttApplicationMessage mess = new MqttApplicationMessage();
mess.Payload = BuildDeathCertificate();
mess.Topic = "...";
MqttClientOptionsBuilder clientOptionsBuilder = new MqttClientOptionsBuilder();
IMqttClientOptions options = clientOptionsBuilder.WithTcpServer("Broker Address", 1234)
.WithClientId("ABCD")
.WithCleanSession(true)
.WithWillMessage(mess)
.WithKeepAlivePeriod(new System.TimeSpan(1234))
.WithCommunicationTimeout(new System.TimeSpan(int.MaxValue))
.Build();
var managedClientOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(options)
.Build();
if (!_mqttClient.IsStarted && !_mqttClient.IsConnected)
{
try
{
await _mqttClient.StartAsync(managedClientOptions);
}
catch (Exception e) { /* ... */ }
}
}
byte[] BuildDeathCertificate()
{
return new byte[1234];
}
public async void PublishMessage(byte[] payloadBytes)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic("...")
.WithPayload(payloadBytes)
.WithExactlyOnceQoS()
.WithRetainFlag(false)
.Build();
try
{
await _mqttClient.PublishAsync(message);
}
catch (NullReferenceException e) { /* ... */ }
}
}
显然,这里存在大量的线程安全问题,并且各种情况都产生了
ObjectDisposed
异常。
我在应用程序的生命周期中使用了一个
ManagedMqttClient
:
internal class SingleClientTest
{
private IManagedMqttClient _mqttClient;
public SingleClientTest()
{
var factory = new MqttFactory();
//Used for lifetime of application
_mqttClient = factory.CreateManagedMqttClient();
}
public async void Connect()
{
//No longer calling CreateManagedClient() here
try
{
if (!_mqttClient.IsConnected && !_mqttClient.IsStarted)
{
StartAsync();
}
}
catch (MQTTnet.Exceptions.MqttCommunicationException ex) { /* ... */ }
catch (MQTTnet.Exceptions.MqttProtocolViolationException ex) { /* ... */ }
}
//The other methods are mostly unchanged
}
总的来说,它解决了
ObjectDisposed
问题,但它没有解决在异步调用之前调用 IsConnected
的线程安全性。而且,考虑到 MqttFactory
的存在,重用一个客户端感觉就像是一种黑客攻击。另外,我遇到了一个有点像“这个问题”的用例。具体来说,尽管 StartAsync()
为 false,但 IsStarted
产生了异常“托管客户端已启动”。如果需要,我可以提供更多细节,但现在我会避免混淆问题。我还尝试在对客户端的调用周围添加 lock
,但是
它们不能在等待的调用周围使用,因为存在死锁风险。 最后,我通读了 MQTTnet 示例、wiki、一些问题,并浏览了一些代码。到目前为止,我还没有在库中找到额外的并发机制。
我正在探索一些选项(也许是这些选项的组合):
在对客户端的所有调用中使用
SemaphorSlim
此处所述 - 看起来它可以解决
await
ed 调用。这就是我的倾向;不确定这是否会引入新的计时问题。使用 MqttClient
ManagedMqttClient
。 [此线程][1] 听起来好像 MqttClient
是首选。我应该使用它吗?在应用程序的生命周期中使用一个 MqttClient
是否合理(当代理设置更改时使用 DisconnectAsync()/ConnectAsync()
)? (这仍然没有解决像_mqttClient.IsConnected
这样的检查)对于 try/catch
ObjectDisposed
包围对客户端对象的每次调用,并像这样替换客户端:
var oldClient = _mqttClient
_mqttClient = _factory.CreateManagedMqttClient();
oldClient?.Dispose();
再次强调,这并不涉及像
_mqttClient.IsConnected
这样的检查。
只是想知道是否有人可以提供有关普遍接受的这样做方式的见解。
使用单个客户端的方式看起来最适用于多线程应用程序。只是您需要专门为您的应用程序创建一个全功能包装器,以防止访问“危险”方法,并且主要仅提供必要的功能。
您的客户端应该在单独的线程中运行,并提供启动、关闭和重新启动的方法,以便在应用程序主周期中使用。
客户还应该配备一个自我控制的看门狗。