我正在尝试使用 ActiveMQ 集群和虚拟主题来创建一个高可用的消息系统,用于 K8s 集群内的服务间通信,并且在故障转移与非故障转移模式下启动连接时遇到了问题。
我正在使用 NMS v2.0.0 和 NMS.ActiveMQ v 2.0.1(因为我在使用这两个版本的 2.1 版本时遇到了其他问题)。
我有代码将启动一个任务,然后将为我的生产者生成 4-5 个子任务,每个子任务负责创建
IConnection
、ISession
和 IProducer
实例(在Polly) 使用连接字符串 activemq:tcp://localhost:61616
。该代码按预期工作。
使用相同的代码,唯一的更改是在启动任务时使用故障转移连接字符串
activemq:failover:(activemq:tcp://localhost:61616,activemq:tcp://localhost:61617,activemq:tcp://localhost:61618)
,在 StartAsync
块上调用 IConnection
。
我尝试将
IConnection.StartAsync()
调用移至其自己的任务中,但是随后发生的情况是对 IConnection.CreateSessionAsync()
块的调用。
我的代码的(片段)版本如下:
foreach (var type in EventUtil.GetAllEventTypes())
{
var initialisationSignal = new ManualResetEventSlim();
tasks.Add(new Tuple<Task, ManualResetEventSlim>(Task.Run(async () =>
{
IConnection connection = null;
ISession session = null;
IMessageProducer producer = null;
try
{
IConnectionFactory factory = new NMSConnectionFactory(_options.ActiveMqUri);
connection = await factory.CreateConnectionAsync();
await connection.StartAsync();
session = await connection.CreateSessionAsync();
var topicName = $"topic://VirtualTopic.{ActiveMqOptions.TopicNamer(type)}";
var destination = (ITopic)SessionUtil.GetDestination(session, topicName);
_logger.LogInformation("Creating producer for Event topic {TopicName}", topicName);
producer = await session.CreateProducerAsync(destination);
await RunProducerTask(type, producer, session, serviceCancel.Token);
}
finally
{
await CloseQuietly(producer, _logger);
await CloseQuietly(session, _logger);
await CloseQuietly(connection, _logger);
}
我尝试按照指南设置故障转移,但是使用
failover:(activemq:tcp://localhost:61616,activemq:tcp://localhost:61617,activemq:tcp://localhost:61618)
语法会生成 Apache.NMS.NMSConnectionException: No IConnectionFactory implementation found for connection URI: failover:(
。
我可以对单代理 URI 使用相同的代码,并且我已经测试了它连接到三个代理中的每一个,并且它们按预期正常工作,对
StartAsync()
和 CreateSessionAsync()
的调用不会阻塞。
在进行故障转移时,这些 API 会被阻止,因为只有在客户端连接并从代理提供了基本的电汇协商数据后,它们才能继续进行。如果您使用的 URI 在这些 API 中停滞不前,那么它显然是错误的。 NMS 客户端的维护不是很积极,因此您最有可能自行修复和支持它们。
我可以为您提供的最简单的修复建议是不要使用 NMSConnectionFactory,它查找传输提供程序的魔力并不大,而是直接转到不需要“activemq”的 IConnectionFactory 的 NMS.ActiveMQ 库实现。 URI 上的前缀,因为它知道其提供者是什么。