这里的挑战是我正在尝试创建一个单一的外观来处理队列和主题,同时保持Send
与Publish
的语义
例如:
public interface IServiceBus
{
Task Send<T>(T message, string destination, SendOptions options = null);
Task Publish<T>(T message, string topic, SendOptions options = null);
}
Send()
会向队列发送消息,Publish()
会向主题发布消息。所以我需要有一个IQueueClient
和ITopicClient
的实例来实现这些目标;我会将这些注入我的IServiceBus
实现作为依赖项并相应地调用它们。
问题是QueueClient和TopicClients要求您在新建客户端时指定其目标,这阻止我将其作为我的IServiceBus
实现的参数。
我可以在创建消息时创建一个客户端,但这样效率极低。我四处寻找至少一个可以作为客户工厂的连接管理器,但MessagingFactory
似乎不在这个SDK中(Microsoft.Azure.ServiceBus 3.4.0)。
所以问题是 - 我可以使用哪种工厂,这样我就可以按需创建适当的客户,并且可以通过重复使用客户获得相同的效率? - 我应该使用某种覆盖或替代客户端对象来实现这种效果吗?这两个客户真的是有限的。
因为我相信我们可以假设QueueClient and TopicClient are thread safe的实例,你可以做的是将已解析的IServiceBus
具体类注册为你的IoC容器中的单例。
在具体的ServiceBus中,您可以创建以前看到的主题和队列客户端的缓存:
private readonly ConcurrentDictionary<string, ITopicClient> _topicClientCache
= new ConcurrentDictionary<string, ITopicClient>();
private readonly ConcurrentDictionary<string, IQueueClient> _queueClientCache
= new ConcurrentDictionary<string, IQueueClient>();
然后在你的Publish
方法
public async Task Publish<T>(T message, string destination, ...)
{
// i.e. destination is the topic
if (!_topicClientCache.TryGetValue(destination, out var topicClient))
{
topicClient = new TopicClient(_myServiceBusConnectionString, destination);
_topicClientCache.TryAdd(destination, topicClient);
}
... create and serialize message into azureMessage here
await topicClient.SendAsync(azureMessage);
}
这同样适用于你的Send
实现 - 它将检查_queueClientCache
的目标(队列名称),并创建它并在它第一次看到它时缓存它。
我终于遇到了一个有类似问题的人。事实证明,他们删除了MessagingFactory,但使连接可重用。每个客户端都有一个构造函数重载,它接受连接,因此我将连接注册为单例并注入而不是客户端,然后只需按需创建客户端。
见:https://github.com/Azure/azure-service-bus-dotnet/issues/556
我的解决方案看起来有点像这样(为简洁省略了完整的实现)
public class AzureServiceBus : IServiceBus
{
public AzureServiceBus(ServiceBusConnection connection, string replyTo)
{
_connection = connection;
_replyTo = replyTo;
_retryPolicy = new RetryExponential(
TimeSpan.FromSeconds(1),
TimeSpan.FromMinutes(1),
10);
}
public async Task Send<T>(T message, string destination)
{
var client = new QueueClient(_connection, destination, ReceiveMode.PeekLock, _retryPolicy);
// ... do work
}
public async Task Publish<T>(T message, string topic, SendOptions options = null)
{
var client = new TopicClient(_connection, topic, _retryPolicy);
// ... do work
}
}