管理多个队列/主题客户端?

问题描述 投票:1回答:2

这里的挑战是我正在尝试创建一个单一的外观来处理队列和主题,同时保持SendPublish的语义

例如:

public interface IServiceBus
{
    Task Send<T>(T message, string destination, SendOptions options = null);
    Task Publish<T>(T message, string topic, SendOptions options = null);
}

Send()会向队列发送消息,Publish()会向主题发布消息。所以我需要有一个IQueueClientITopicClient的实例来实现这些目标;我会将这些注入我的IServiceBus实现作为依赖项并相应地调用它们。

问题是QueueClient和TopicClients要求您在新建客户端时指定其目标,这阻止我将其作为我的IServiceBus实现的参数。

我可以在创建消息时创建一个客户端,但这样效率极低。我四处寻找至少一个可以作为客户工厂的连接管理器,但MessagingFactory似乎不在这个SDK中(Microsoft.Azure.ServiceBus 3.4.0)。

所以问题是 - 我可以使用哪种工厂,这样我就可以按需创建适当的客户,并且可以通过重复使用客户获得相同的效率? - 我应该使用某种覆盖或替代客户端对象来实现这种效果吗?这两个客户真的是有限的。

azureservicebus azure-servicebus-topics
2个回答
2
投票

因为我相信我们可以假设QueueClient and TopicClient are thread safe的实例,你可以做的是将已解析的IServiceBus具体类注册为你的IoC容器中的单例。

在具体的ServiceBus中,您可以创建以前看到的主题和队列客户端的缓存:

private readonly ConcurrentDictionary<string, ITopicClient> _topicClientCache
    = new ConcurrentDictionary<string, ITopicClient>();
private readonly ConcurrentDictionary<string, IQueueClient> _queueClientCache
    = new ConcurrentDictionary<string, IQueueClient>();

然后在你的Publish方法

public async Task Publish<T>(T message, string destination, ...)
{
    // i.e. destination is the topic
    if (!_topicClientCache.TryGetValue(destination, out var topicClient))
    {
        topicClient = new TopicClient(_myServiceBusConnectionString, destination);
        _topicClientCache.TryAdd(destination, topicClient);
    }
    ... create and serialize message into azureMessage here
    await topicClient.SendAsync(azureMessage);
}

这同样适用于你的Send实现 - 它将检查_queueClientCache的目标(队列名称),并创建它并在它第一次看到它时缓存它。


0
投票

我终于遇到了一个有类似问题的人。事实证明,他们删除了MessagingFactory,但使连接可重用。每个客户端都有一个构造函数重载,它接受连接,因此我将连接注册为单例并注入而不是客户端,然后只需按需创建客户端。

见:https://github.com/Azure/azure-service-bus-dotnet/issues/556

我的解决方案看起来有点像这样(为简洁省略了完整的实现)

public class AzureServiceBus : IServiceBus
{
    public AzureServiceBus(ServiceBusConnection connection, string replyTo)
    {
        _connection = connection;
        _replyTo = replyTo;
        _retryPolicy = new RetryExponential(
            TimeSpan.FromSeconds(1),
            TimeSpan.FromMinutes(1),
            10);
    }

    public async Task Send<T>(T message, string destination)
    {
        var client = new QueueClient(_connection, destination, ReceiveMode.PeekLock, _retryPolicy);

        // ... do work
    }

    public async Task Publish<T>(T message, string topic, SendOptions options = null)
    {
        var client = new TopicClient(_connection, topic, _retryPolicy);

        // ... do work
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.