如何将Azure Service Bus与Qpid Proton Java连接?

问题描述 投票:0回答:1

我需要连接Azure服务总线来发送消息。我按照给定的示例,仅将主机名更改为

amqps://XXX:[email protected]/queueName

运行 Send 类时,出现以下错误。我尝试了很多但无法解决问题。

线程“main”中的异常 org.apache.qpid.proton.engine.HandlerException: java.lang.IllegalStateException:没有为连接提供地址 在 org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:112) 在 org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) 在 org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:292) 在 org.apache.qpid.proton.reactor.impl.ReactorImpl.run(ReactorImpl.java:357) 在 org.apache.qpid.proton.example.reactor.Send.main(Send.java:151) 导致:java.lang.IllegalStateException:没有提供地址 连接于 org.apache.qpid.proton.reactor.impl.IOHandler.handleBound(IOHandler.java:155) 在 org.apache.qpid.proton.reactor.impl.IOHandler.onUnhandled(IOHandler.java:380) 在 org.apache.qpid.proton.engine.BaseHandler.onConnectionBound(BaseHandler.java:58) 在 org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:131) 在 org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ...还有4个

示例代码:

package org.apache.qpid.proton.example.reactor;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.logging.LogManager;

import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.reactor.Handshaker;
import org.apache.qpid.proton.reactor.Reactor;

// This is a send in terms of low level AMQP events.
public class Send extends BaseHandler {

    private class SendHandler extends BaseHandler {

        private final Message message;
        private int nextTag = 0;

        private SendHandler(Message message) {
            this.message = message;

            // Add a child handler that performs some default handshaking
            // behaviour.
            add(new Handshaker());
        }

        @Override
        public void onConnectionInit(Event event) {
            Connection conn = event.getConnection();

            // Every session or link could have their own handler(s) if we
            // wanted simply by adding the handler to the given session
            // or link
            Session ssn = conn.session();

            // If a link doesn't have an event handler, the events go to
            // its parent session. If the session doesn't have a handler
            // the events go to its parent connection. If the connection
            // doesn't have a handler, the events go to the reactor.
            Sender snd = ssn.sender("sender");
            conn.open();
            ssn.open();
            snd.open();
        }

        @Override
        public void onLinkFlow(Event event) {
            Sender snd = (Sender)event.getLink();
            if (snd.getCredit() > 0) {
                byte[] msgData = new byte[1024];
                int length;
                while(true) {
                    try {
                        length = message.encode(msgData, 0, msgData.length);
                        break;
                    } catch(BufferOverflowException e) {
                        msgData = new byte[msgData.length * 2];
                    }
                }
                byte[] tag = String.valueOf(nextTag++).getBytes();
                Delivery dlv = snd.delivery(tag);
                snd.send(msgData, 0, length);
                dlv.settle();
                snd.advance();
                snd.close();
                snd.getSession().close();
                snd.getSession().getConnection().close();
            }
        }

        @Override
        public void onTransportError(Event event) {
            ErrorCondition condition = event.getTransport().getCondition();
            if (condition != null) {
                System.err.println("Error: " + condition.getDescription());
            } else {
                System.err.println("Error (no description returned).");
            }
        }
    }

    private final String host;
    private final int port;
    private final Message message;

    private Send(String host, int port, String content) {
        this.host = host;
        this.port = port;
        message = Proton.message();
        message.setBody(new AmqpValue(content));
    }

    @Override
    public void onReactorInit(Event event) {
        // You can use the connection method to create AMQP connections.

        // This connection's handler is the SendHandler object. All the events
        // for this connection will go to the SendHandler object instead of
        // going to the reactor. If you were to omit the SendHandler object,
        // all the events would go to the reactor.
        event.getReactor().connectionToHost(host, port, new SendHandler(message));
         //event.getReactor().connectionToHost(host, port, null);
    }

    public static void main(String[] args) throws IOException {
        int port = 5672;
        String host = "amqps://XXX:[email protected]/queueName"; // Only changed hostname

        if (args.length > 0) {
            String[] parts = args[0].split(":", 2);
            host = parts[0];
            if (parts.length > 1) {
                port = Integer.parseInt(parts[1]);
            }
        }
        String content = args.length > 1 ? args[1] : "Hello World!";

        Reactor r = Proton.reactor(new Send(host, port, content));
        r.run();

    }

}
java azureservicebus qpid
1个回答
0
投票

有点晚了,对于图书馆的更现代版本,但我自己很难找到这个答案。请参阅下面的基本示例,该示例从今天起可与 Qpid Proton-J2 一起使用,该示例源自此处的示例代码。

package com;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
import org.apache.qpid.protonj2.client.Delivery;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.Sender;
import org.apache.qpid.protonj2.client.exceptions.ClientException;

public class Test {

    public static void main(String[] args) throws ClientException {
        final String serverHost = System.getProperty("HOST", "YOUR_SERVICEBUS_NAMESPACE_HERE.servicebus.windows.net");
        final int serverPort = Integer.getInteger("PORT", 5671);
        final String address = System.getProperty("ADDRESS", "YOUR_QUEUE_NAME_HERE");

        final Client client = Client.create();

        final ConnectionOptions options = new ConnectionOptions();
        options.sslOptions().sslEnabled(true);
        options.user("RootManageSharedAccessKey");
        options.password("YOUR_SHARED_ACCESS_KEY_HERE");

        Connection connection = client.connect(serverHost, serverPort, options);
        Sender sender = connection.openSender(address);
        sender.send(Message.create("Hello World"));

        Receiver receiver = connection.openReceiver(address);
        Delivery delivery = receiver.receive();
        Message<String> received = delivery.message();
        System.out.println("Received message with body: " + received.body().toString());
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.