我在 .NET Core 控制台应用程序中尝试通过 NMS 建立与 ActiveMQ 的连接时遇到异常

问题描述 投票:0回答:1

我正在尝试创建简单的概念验证控制台应用程序,该应用程序向队列生成 XML 文件消息,然后使用该队列中的消息。问题是我无法通过 .CreateConnection() 方法创建连接。当我尝试这样做时,我得到了这个异常:

Unhandled exception. Apache.NMS.NMSConnectionException: Error creating transport.
 ---> System.Exception: NewInstance failed to find a match for id = tcp
   at Apache.NMS.ActiveMQ.Transport.TransportFactory.NewInstance(String scheme)
   at Apache.NMS.ActiveMQ.Transport.TransportFactory.CreateTransportFactory(Uri location)

这是我的program.cs文件:

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Microsoft.Extensions.Configuration;

class Program
{
    static void Main(String[] args)
    {
        var builder = new ConfigurationBuilder();
        String? brokerUri = "tcp://localhost:61616";
        String? requestQueueName = "newQueue";
        String filePath = "C:\\Users\\asasy\\OneDrive\\Desktop\\Dokumenciki\\Tauron\\ActiveMQ_Sender\\messages\\files.xml";
        String xmlResponseContent = String.Empty;

            xmlResponseContent = System.IO.File.ReadAllText(filePath);
            // Send messages to the queue
            Console.WriteLine(xmlResponseContent);
            SendMessage(brokerUri, requestQueueName, xmlResponseContent);
        Thread.Sleep(120);
        // Receive messages from the queue
        ReceiveMessages(brokerUri, "RequestQueue");
        Console.WriteLine("Press any key to exit...");
        Console.ReadKey();
    }
    static void SendMessage(String brokerUri, String queueName, String message)
    {
        Apache.NMS.IConnectionFactory factory = new ConnectionFactory(new Uri(brokerUri));
        
        using (IConnection connection = factory.CreateConnection())
        {
            try
            {
                connection.Start();
                using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    IDestination destination = session.GetQueue(queueName);
                    using (IMessageProducer producer = session.CreateProducer(destination))
                    {
                        ITextMessage textMessage = producer.CreateTextMessage(message);
                        producer.Send(textMessage);
                        Console.WriteLine("Message sent: " + textMessage.Text);
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("Error occurred: " + ex.Message);
            }
            finally
            {
                if (connection != null && connection.IsStarted)
                {
                    connection.Stop();
                    connection.Close();
                }
            }
        }
    }
    static void ReceiveMessages(String brokerUri, String queueName)
    {
        IConnectionFactory factory = new NMSConnectionFactory(brokerUri);
        using (IConnection connection = factory.CreateConnection())
        {
            try
            {
                connection.Start();
                using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    IDestination destination = session.GetQueue(queueName);
                    using (IMessageConsumer consumer = session.CreateConsumer(destination))
                    {
                        while (true)
                        {
                            IMessage message = consumer.Receive();
                            if (message is ITextMessage textMessage)
                            {
                                Console.WriteLine("Received message: " + textMessage.Text);
                            }
                            else if (message == null)
                            {
                                // No more messages in the queue
                                break;
                            }
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("Error occurred: " + ex.Message);
            }
            finally
            {
                if (connection != null && connection.IsStarted)
                {
                    connection.Stop();
                    connection.Close();
                }
            }
        }
    }
}

这是我的 activemq.xml 文件中的 TransportConnectors

        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

我尝试使用不同的协议,例如 stomp,但这并没有真正帮助。我尝试禁用防火墙和防病毒软件,希望这会有所帮助,但没有。

我查看了文档和源代码以了解“无法找到 id = tcp 的匹配”消息,但我没有找到任何内容。

c# .net .net-core console-application activemq
1个回答
0
投票

使用 NMS API 工厂时

NMSConnectionFactory
您需要指定要使用的协议库,在您的情况下为“activemq”,因此您的 URI 看起来像这样

activemq:tcp://localhost:61616

并且您的项目中必须有对 NMS.ActiveMQ 库的引用,以便 NMSConnectionFactory 可以找到它。

© www.soinside.com 2019 - 2024. All rights reserved.