我们正在开发一个.Net Core服务,该服务将托管在Azure Service Fabric中。此SF服务需要通过其AMQP 1.0 SSL TLS端点与在Azure IoT Hub中注册的10,000个设备进行交互。每个IoT Hub设备都有自己的安全令牌和IoT Hub服务提供的连接字符串。
对于我们的场景,我们需要收听来自10,000 IoT Hub设备实例的所有云到设备消息,并将这些消息“路由”到该字段中实际“网关”监听的中央服务总线主题。所以基本上我们想要将来自10,000个服务总线队列的消息转发到一个中央队列中。
从SF服务处理这10,000个AMQP列表的最佳方法是什么?有没有办法可以重用AMQP连接,会话或链接,以便我们缓存/共享资源?我们如何在SF集群中的5个节点上动态分散连接维护负载?
我们正在评估这些Nuget包的实现:Microsoft.Azure.ServiceBus
AMQPNetLite
Microsoft.Azure.Devices.Client
我们正在使用Microsoft.Azure.Devices.Client
lib进行一些测试,请参阅下面的简化代码示例:
using System;
using System.Fabric;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.ServiceFabric.Services.Runtime;
namespace ID.Monitoring.MonServer.ServiceFabric.ServiceBus
{
/// <summary>
/// An instance of this class is created for each service instance by the Service Fabric runtime.
/// </summary>
internal sealed class ServiceBus : StatelessService
{
private readonly DeviceClient _deviceClient;
private ConnectionStatus _status;
public ServiceBus(StatelessServiceContext context)
: base(context)
{
_deviceClient = DeviceClient.CreateFromConnectionString("HostName=id-monitoring-dev.azure-devices.net;DeviceId=100;SharedAccessSignature=SharedAccessSignature sr=id-monitoring-dev.azure-devices.net%2Fdevices%2F100&sig={token}&se=1553265888", TransportType.Amqp_Tcp_Only);
}
/// <summary>
/// This is the main entry point for your service instance.
/// </summary>
/// <param name="cancellationToken">Canceled when Service Fabric needs to shut down this service instance.</param>
protected override async Task RunAsync(CancellationToken cancellationToken)
{
_deviceClient.SetConnectionStatusChangesHandler(ConnectionStatusChangeHandler);
while (true)
{
if (_status != ConnectionStatus.Connected)
{
await _deviceClient.OpenAsync();
}
var receivedMessage = await _deviceClient.ReceiveAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false);
if (receivedMessage != null)
{
var messageData = Encoding.ASCII.GetString(receivedMessage.GetBytes());
//TODO: handle incoming message and publish to common
await _deviceClient.CompleteAsync(receivedMessage).ConfigureAwait(false);
}
}
}
private void ConnectionStatusChangeHandler(ConnectionStatus status, ConnectionStatusChangeReason reason)
{
_status = status;
}
}
}
问题:这是否可以很好地扩展到10,000个Service Fabric服务实例?或者是否有更有效的方法可以从Service Fabric Service环境维护这么多AMQP服务总线列表器?有没有办法可以应用AMQP连接多路复用?
看看this。
第二个答案提供了一个示例,允许您将多个设备复用到一个Amqp连接中。
您选择监控设备的方法将无法很好地扩展,并且难以维护。
目前,服务结构有一个limitation,表示您可以在单个节点中放置多少个实例。例如:如果使用ServiceBus服务创建应用程序并跨越10000个实例,则会遇到此限制,即节点数。即:如果您有5节点群集,则可以使用默认缩放方法仅运行5个服务实例。
要绕过此问题,您有一些选择:
分区:
要使单个无状态服务运行的分区数多于节点数,您必须对服务进行分区。假设您有一个5节点集群并需要10000个实例,则每个节点上需要运行2000个分区。如果你使用共享进程并且有足够的内存,这种方法可能对你有帮助,请在采用这种方法之前先看看this thread和this thread
多命名服务:
命名服务是一种服务类型的运行服务定义,在这种情况下,您将为每个设备创建一个服务。喜欢:
- ServiceBusType ServiceBus,Device1的 ServiceBus-设备2 ServiceBus-设备3
这种方法会占用机器中过多的资源,因为您将为每个设备运行一个实例,但易于管理,因为您可以跨越每个新设备的新实例,而不会影响其他正在运行的服务。
每个实例的并行处理:
每个实例都负责同时处理多个消息,在这种情况下,您将为每个实例创建2000个连接(如果在每个集群的5个实例/节点中运行)。这将比资源消耗的其他方法更轻,但是维护起来有点困难,因为您必须自己处理余额,并且可能需要额外的服务来监视任务并将任务委派给所有服务并确保消息正在处理均匀。
摘要:
一次处理一个消息的一个实例将需要10000个服务实例,分区将类似,但您可以使用共享进程来减少内存消耗,但两种情况下内存消耗仍然很高。
如果服务数量不是太高,则可以选择多个命名服务,您也无法共享连接。因此,我不会为您的方案推荐此方法。
第三种选择是资源更友好,但您必须找到一种在整个群集节点中均匀分区连接的方法。
您还可以使用混合方法,例如,您可以并行处理多个消息的服务和分区服务,以定义设备的关键范围。
请查看我提到的链接。
我发现有一个DeviceClient构造函数允许设置AmqpConnectionPoolSettings。