如何管理 MQTTnet 客户端的生命周期?

问题描述 投票:0回答:1

tl;dr:如何避免在另一个线程上使用 MQTTnet 客户端时对其进行处置? 也许这适用于任何

IDisposable
,但在
ManagedMqttClient
的情况下,也存在类似
IsConnected
的调用在异步调用之前要担心。

注意:我们使用的是 MQTTnet v3.0.16。我愿意接受包括“升级到最新版本,然后使用方法 X”的答案

我继承了一个使用

ManagedMqttClient
的应用程序,并在用户更改代理设置时最初替换/处置了该客户端:

using MQTTnet;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Threading.Tasks;

internal class OriginalApproach
{
    private IManagedMqttClient _mqttClient;
    private static MqttFactory _factory;

    public OriginalApproach()
    {
        _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttClientDisconnectedEventArgs => OnDisconnect(MqttClientDisconnectedEventArgs));
    }

    //Called if the user changes settings that affect the way we connect
    //to the broker.
    public async void OnSettingsChange()
    {
        if (_mqttClient != null && _mqttClient.IsConnected)
        {
            StopAsync();
            return;
        }

        //Disposal isn't the only thread safety issue
        if (_mqttClient != null && _mqttClient.IsStarted)
        {
            await Reconnect(TimeSpan.FromSeconds(2));
        }
    }

    public async void StopAsync()
    {
        if (_mqttClient != null)
        {
            await _mqttClient.StopAsync();
            await Task.Delay(System.TimeSpan.FromSeconds(2));
        }
    }

    public async void OnDisconnect(MqttClientDisconnectedEventArgs e)
    {
        await Reconnect(TimeSpan.FromSeconds(5));
    }

    public async Task Reconnect(TimeSpan delay)
    {
        StopAsync();
        await Task.Delay(delay);
        Connect();
    }

    public async void Connect()
    {
        await CreateManagedClient();

        try
        {
            if (!_mqttClient.IsConnected && !_mqttClient.IsStarted)
            {
                StartAsync();
            }
        }
        catch (MQTTnet.Exceptions.MqttCommunicationException ex) { /* ... */  }
        catch (MQTTnet.Exceptions.MqttProtocolViolationException ex) { /* ... */  }
    }

    public async Task<bool> CreateManagedClient()
    {
        try
        {
            if (_mqttClient != null)
                _mqttClient.Dispose();

            _factory = new MqttFactory();
            _mqttClient = _factory.CreateManagedMqttClient();
            await Task.Delay(System.TimeSpan.FromSeconds(2));
        }
        catch (Exception e)
        {
            _mqttClient.Dispose();
            _mqttClient = null;
            return false;
        }
        return true;
    }

    public async void StartAsync()
    {
        MqttApplicationMessage mess = new MqttApplicationMessage();

        mess.Payload = BuildDeathCertificate();
        mess.Topic = "...";

        MqttClientOptionsBuilder clientOptionsBuilder = new MqttClientOptionsBuilder();

        IMqttClientOptions options = clientOptionsBuilder.WithTcpServer("Broker Address", 1234)
                .WithClientId("ABCD")
                .WithCleanSession(true)
                .WithWillMessage(mess)
                .WithKeepAlivePeriod(new System.TimeSpan(1234))
                .WithCommunicationTimeout(new System.TimeSpan(int.MaxValue))
                .Build();

        var managedClientOptions = new ManagedMqttClientOptionsBuilder()
            .WithClientOptions(options)
            .Build();

        if (!_mqttClient.IsStarted && !_mqttClient.IsConnected)
        {
            try
            {
                await _mqttClient.StartAsync(managedClientOptions);
            }
            catch (Exception e) { /* ... */  }
        }
    }

    byte[] BuildDeathCertificate()
    {
        return new byte[1234];
    }

    public async void PublishMessage(byte[] payloadBytes)
    {
        var message = new MqttApplicationMessageBuilder()
            .WithTopic("...")
            .WithPayload(payloadBytes)
            .WithExactlyOnceQoS()
            .WithRetainFlag(false)
            .Build();

        try
        {
            await _mqttClient.PublishAsync(message);
        }
        catch (NullReferenceException e) { /* ... */  }
    }
}

显然,这里存在大量的线程安全问题,并且各种情况都产生了

ObjectDisposed
异常。

我在应用程序的生命周期中使用了一个

ManagedMqttClient

internal class SingleClientTest
{
    private IManagedMqttClient _mqttClient;
    public SingleClientTest()
    {
        var factory = new MqttFactory();

        //Used for lifetime of application
        _mqttClient = factory.CreateManagedMqttClient();
    }

    public async void Connect()
    {
        //No longer calling CreateManagedClient() here

        try
        {
            if (!_mqttClient.IsConnected && !_mqttClient.IsStarted)
            {
                StartAsync();
            }
        }
        catch (MQTTnet.Exceptions.MqttCommunicationException ex) { /* ... */  }
        catch (MQTTnet.Exceptions.MqttProtocolViolationException ex) { /* ... */  }
    }

    //The other methods are mostly unchanged
}

总的来说,它解决了

ObjectDisposed
问题,但它没有解决在异步调用之前调用
IsConnected
的线程安全性。而且,考虑到
MqttFactory
的存在,重用一个客户端感觉就像是一种黑客攻击。另外,我遇到了一个有点像“这个问题”的用例。具体来说,尽管 StartAsync() 为 false,但
IsStarted
产生了异常“托管客户端已启动”。如果需要,我可以提供更多细节,但现在我会避免混淆问题。
我还尝试在对客户端的调用周围添加 

lock

,但是

它们不能在等待的调用周围使用
,因为存在死锁风险。 最后,我通读了 MQTTnet 示例、wiki、一些问题,并浏览了一些代码。到目前为止,我还没有在库中找到额外的并发机制。

我正在探索一些选项(也许是这些选项的组合):

在对客户端的所有调用中使用
    SemaphorSlim
  1. ,如
    此处
    所述 - 看起来它可以解决 awaited 调用。这就是我的倾向;不确定这是否会引入新的计时问题。
    使用 
  2. MqttClient
  3. ,而不是
    ManagedMqttClient
    。 [此线程][1] 听起来好像
    MqttClient
    是首选。我应该使用它吗?在应用程序的生命周期中使用一个
    MqttClient
    是否合理(当代理设置更改时使用
    DisconnectAsync()/ConnectAsync()
    )? (这仍然没有解决像
    _mqttClient.IsConnected
    这样的检查)
    对于 
  4. try/catch
  5. 异常,用
    ObjectDisposed
    包围对客户端对象的每次调用,并像这样替换客户端:
    
    
  6. var oldClient = _mqttClient _mqttClient = _factory.CreateManagedMqttClient(); oldClient?.Dispose();
再次强调,这并不涉及像 
_mqttClient.IsConnected

这样的检查。

只是想知道是否有人可以提供有关普遍接受的这样做方式的见解。

c# multithreading async-await thread-safety mqttnet
1个回答
0
投票

使用单个客户端的方式看起来最适用于多线程应用程序。只是您需要专门为您的应用程序创建一个全功能包装器,以防止访问“危险”方法,并且主要仅提供必要的功能。

您的客户端应该在单独的线程中运行,并提供启动、关闭和重新启动的方法,以便在应用程序主周期中使用。

客户还应该配备一个自我控制的看门狗。

© www.soinside.com 2019 - 2024. All rights reserved.