Apache NMS 和故障转移:URI 阻塞与非阻塞

问题描述 投票:0回答:1

我正在尝试使用 ActiveMQ 集群和虚拟主题来创建一个高可用的消息系统,用于 K8s 集群内的服务间通信,并且在故障转移与非故障转移模式下启动连接时遇到了问题。

我正在使用 NMS v2.0.0 和 NMS.ActiveMQ v 2.0.1(因为我在使用这两个版本的 2.1 版本时遇到了其他问题)。

我有代码将启动一个任务,然后将为我的生产者生成 4-5 个子任务,每个子任务负责创建

IConnection
ISession
IProducer
实例(在Polly) 使用连接字符串
activemq:tcp://localhost:61616
。该代码按预期工作。

使用相同的代码,唯一的更改是在启动任务时使用故障转移连接字符串

activemq:failover:(activemq:tcp://localhost:61616,activemq:tcp://localhost:61617,activemq:tcp://localhost:61618)
,在
StartAsync
块上调用
IConnection

我尝试将

IConnection.StartAsync()
调用移至其自己的任务中,但是随后发生的情况是对
IConnection.CreateSessionAsync()
块的调用。

我的代码的(片段)版本如下:

            foreach (var type in EventUtil.GetAllEventTypes())
            {                    
                var initialisationSignal = new ManualResetEventSlim();

                tasks.Add(new Tuple<Task, ManualResetEventSlim>(Task.Run(async () =>
                {
                    IConnection connection = null;
                    ISession session = null;
                    IMessageProducer producer = null;
                    
                    try
                    {
                        IConnectionFactory factory = new NMSConnectionFactory(_options.ActiveMqUri); 
                        connection = await factory.CreateConnectionAsync();
                        await connection.StartAsync();
            
                        session = await connection.CreateSessionAsync();

                        var topicName = $"topic://VirtualTopic.{ActiveMqOptions.TopicNamer(type)}";
                        var destination = (ITopic)SessionUtil.GetDestination(session, topicName);

                        _logger.LogInformation("Creating producer for Event topic {TopicName}", topicName);

                        producer = await session.CreateProducerAsync(destination);
                        await RunProducerTask(type, producer, session, serviceCancel.Token);
                    }
                    finally
                    {    
                        await CloseQuietly(producer, _logger);
                        await CloseQuietly(session, _logger);
                        await CloseQuietly(connection, _logger);
                    }

我尝试按照指南设置故障转移,但是使用

failover:(activemq:tcp://localhost:61616,activemq:tcp://localhost:61617,activemq:tcp://localhost:61618)
语法会生成
Apache.NMS.NMSConnectionException: No IConnectionFactory implementation found for connection URI: failover:(

我可以对单代理 URI 使用相同的代码,并且我已经测试了它连接到三个代理中的每一个,并且它们按预期正常工作,对

StartAsync()
CreateSessionAsync()
的调用不会阻塞。

activemq apache-nms
1个回答
0
投票

在进行故障转移时,这些 API 会被阻止,因为只有在客户端连接并从代理提供了基本的电汇协商数据后,它们才能继续进行。如果您使用的 URI 在这些 API 中停滞不前,那么它显然是错误的。 NMS 客户端的维护不是很积极,因此您最有可能自行修复和支持它们。

我可以为您提供的最简单的修复建议是不要使用 NMSConnectionFactory,它查找传输提供程序的魔力并不大,而是直接转到不需要“activemq”的 IConnectionFactory 的 NMS.ActiveMQ 库实现。 URI 上的前缀,因为它知道其提供者是什么。

© www.soinside.com 2019 - 2024. All rights reserved.