在内存amqp中进行开发和测试

问题描述 投票:1回答:2

我在我的服务中使用兔子,但是由于限制我无法在本地下载。为此,我想在内存代理中使用,并认为qpic可以工作。我有以下配置和日志我可以看到qpid代理启动正常,但当春季启动尝试发送消息它无法连接。

   @Bean
    Broker broker() throws Exception {
        org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
        BrokerOptions brokerOptions = new BrokerOptions();
        brokerOptions.setConfigProperty("qpid.amqp_port", "5672");
        brokerOptions.setConfigProperty("qpid.broker.defaultPreferenceStoreAttributes", "{\"type\": \"Noop\"}");
        brokerOptions.setConfigProperty("qpid.vhost", "/");
        brokerOptions.setConfigurationStoreType("Memory");
        brokerOptions.setStartupLoggedToSystemOut(false);
        broker.startup(brokerOptions);
        return broker;
    }

在资源我有初始配置如下:

{
  "name": "Embedded Test Broker",
  "modelVersion": "6.1",
  "authenticationproviders" : [{
    "name": "password",
    "type": "Plain",
    "secureOnlyMechanisms": [],
    "users": [{"name": "guest", "password": "guest", "type": "managed"}]
  }],
  "ports": [{
    "name": "AMQP",
    "port": "${qpid.amqp_port}",
    "authenticationProvider": "password",
    "protocols": [ "AMQP_0_9_1" ],
    "transports": [ "TCP" ],
    "virtualhostaliases": [{
      "name": "${qpid.vhost}",
      "type": "nameAlias"
    }]
  }],
  "virtualhostnodes" : [{
    "name": "${qpid.vhost}",
    "type": "Memory",
    "virtualHostInitialConfiguration": "{ \"type\": \"Memory\" }"
  }]
}

我得到的错误

2018-08-15 19:55:07 CachingConnectionFactory - Attempting to connect to: [localhost:5672]
2018-08-15 19:55:07 NonBlockingConnection - Identified transport encryption as NONE
2018-08-15 19:55:07 NonBlockingConnection - Read 8 byte(s)
2018-08-15 19:55:07 TaskExecutorImpl - Submitting Task['create' on '/127.0.0.1:56891(?)'] to executor Broker-Config
2018-08-15 19:55:07 TaskExecutorImpl - Performing Task['create' on '/127.0.0.1:56891(?)']
2018-08-15 19:55:07 open - [con:2(/127.0.0.1:56891)] CON-1001 : Open : Destination : AMQP(127.0.0.1:5672) : Protocol Version : 0-9-1
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable com.google.common.util.concurrent.Futures$6@3979d0 through executor interface
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable com.google.common.util.concurrent.Futures$6@db842 through executor interface
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable com.google.common.util.concurrent.Futures$6@4c9750 through executor interface
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ProtocolHeader [AMQP0091 ]
2018-08-15 19:55:07 TaskExecutorImpl - Task['create' on '/127.0.0.1:56891(?)'] performed successfully with result: null
2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionStartBodyImpl: versionMajor=0, versionMinor=9, serverProperties={product=[LONG_STRING: qpid], version=[LONG_STRING: 6.1.1], qpid.build=[LONG_STRING: 1775107], qpid.instance_name=[LONG_STRING: Embedded Test Broker], qpid.close_when_no_route=[LONG_STRING: true], qpid.message_compression_supported=[LONG_STRING: true], qpid.confirmed_publish_supported=[LONG_STRING: true], qpid.virtualhost_properties_supported=[LONG_STRING: true], qpid.queue_lifetime_supported=[LONG_STRING: true]}, mechanisms=[80, 76, 65, 73, 78, 32, 67, 82, 65, 77, 45, 77, 68, 53, 32, 83, 67, 82, 65, 77, 45, 83, 72, 65, 45, 49, 32, 83, 67, 82, 65, 77, 45, 83, 72, 65, 45, 50, 53, 54], locales=[101, 110, 95, 85, 83]]
2018-08-15 19:55:07 FieldTable - FieldTable::writeToBuffer: Writing encoded length of 308...
2018-08-15 19:55:07 FieldTable - {product=[LONG_STRING: qpid], version=[LONG_STRING: 6.1.1], qpid.build=[LONG_STRING: 1775107], qpid.instance_name=[LONG_STRING: Embedded Test Broker], qpid.close_when_no_route=[LONG_STRING: true], qpid.message_compression_supported=[LONG_STRING: true], qpid.confirmed_publish_supported=[LONG_STRING: true], qpid.virtualhost_properties_supported=[LONG_STRING: true], qpid.queue_lifetime_supported=[LONG_STRING: true]}
2018-08-15 19:55:07 NonBlockingConnection - Written 379 bytes
2018-08-15 19:55:07 NonBlockingConnection - Read 0 byte(s)
2018-08-15 19:55:07 NonBlockingConnection - Read 443 byte(s)
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionStartOk[ clientProperties: {connection_name=[LONG_STRING: rabbitConnectionFactory#1800efd:1], product=[LONG_STRING: RabbitMQ], copyright=[LONG_STRING: Copyright (c) 2007-2017 Pivotal Software, Inc.], capabilities=[FIELD_TABLE: {exchange_exchange_bindings=[BOOLEAN: true], connection.blocked=[BOOLEAN: true], authentication_failure_close=[BOOLEAN: true], basic.nack=[BOOLEAN: true], publisher_confirms=[BOOLEAN: true], consumer_cancel_notify=[BOOLEAN: true]}], information=[LONG_STRING: Licensed under the MPL. See http://www.rabbitmq.com/], version=[LONG_STRING: 5.1.2], platform=[LONG_STRING: Java]} mechanism: PLAIN response: ******** locale: en_US ]
2018-08-15 19:55:07 AMQPConnection_0_8Impl - SASL Mechanism selected: PLAIN Locale : en_US
2018-08-15 19:55:07 AMQPConnection_0_8Impl - Connected as: Subject:
    Principal: guest

2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionTuneBodyImpl: channelMax=256, frameMax=262136, heartbeat=0]
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 1 ms.
2018-08-15 19:55:07 NonBlockingConnection - Written 20 bytes
2018-08-15 19:55:07 NonBlockingConnection - Read 0 byte(s)
2018-08-15 19:55:07 NonBlockingConnection - Read 36 byte(s)
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionTuneOk[ channelMax: 256 frameMax: 262136 heartbeat: 60 ]
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 0 ms.
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionOpen[ virtualHost: / capabilities: null insist: false ]
2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionCloseBodyImpl: replyCode=404, replyText=Unknown virtual host: '/', classId=10, methodId=40]
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 0 ms.
2018-08-15 19:55:07 NonBlockingConnection - Written 44 bytes
2018-08-15 19:55:07 NonBlockingConnection - Read 0 byte(s)
2018-08-15 19:55:07 NonBlockingConnection - Read 12 byte(s)
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionCloseOk
2018-08-15 19:55:07 NonBlockingConnection - Closing /127.0.0.1:56891
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 1 ms.
2018-08-15 19:55:07 MultiVersionProtocolEngine - Closed
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable org.apache.qpid.server.transport.AbstractAMQPConnection$1@ddf89f through executor interface
2018-08-15 19:55:07 ForgivingExceptionHandler - An unexpected connection driver error occured
java.net.SocketException: socket closed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.io.BufferedInputStream.fill(Unknown Source)
    at java.io.BufferedInputStream.read(Unknown Source)
    at java.io.DataInputStream.readUnsignedByte(Unknown Source)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580)
    at java.lang.Thread.run(Unknown Source)
2018-08-15 19:55:07 NonBlockingConnection - Identified transport encryption as NONE
2018-08-15 19:55:07 close - [con:2(guest@/127.0.0.1:56891)] CON-1002 : Close
2018-08-15 19:55:07 NonBlockingConnection - Read 8 byte(s)
2018-08-15 19:55:07 TaskExecutorImpl - Submitting Task['create' on '/0:0:0:0:0:0:0:1:56892(?)'] to executor Broker-Config
2018-08-15 19:55:07 TaskExecutorImpl - Performing Task['create' on '/0:0:0:0:0:0:0:1:56892(?)']
2018-08-15 19:55:07 open - [con:3(/0:0:0:0:0:0:0:1:56892)] CON-1001 : Open : Destination : AMQP(0:0:0:0:0:0:0:1:5672) : Protocol Version : 0-9-1
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable com.google.common.util.concurrent.Futures$6@146d207 through executor interface
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable com.google.common.util.concurrent.Futures$6@1d1fae1 through executor interface
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable com.google.common.util.concurrent.Futures$6@ed29c3 through executor interface
2018-08-15 19:55:07 TaskExecutorImpl - Task['create' on '/0:0:0:0:0:0:0:1:56892(?)'] performed successfully with result: null
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ProtocolHeader [AMQP0091 ]
2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionStartBodyImpl: versionMajor=0, versionMinor=9, serverProperties={product=[LONG_STRING: qpid], version=[LONG_STRING: 6.1.1], qpid.build=[LONG_STRING: 1775107], qpid.instance_name=[LONG_STRING: Embedded Test Broker], qpid.close_when_no_route=[LONG_STRING: true], qpid.message_compression_supported=[LONG_STRING: true], qpid.confirmed_publish_supported=[LONG_STRING: true], qpid.virtualhost_properties_supported=[LONG_STRING: true], qpid.queue_lifetime_supported=[LONG_STRING: true]}, mechanisms=[80, 76, 65, 73, 78, 32, 67, 82, 65, 77, 45, 77, 68, 53, 32, 83, 67, 82, 65, 77, 45, 83, 72, 65, 45, 49, 32, 83, 67, 82, 65, 77, 45, 83, 72, 65, 45, 50, 53, 54], locales=[101, 110, 95, 85, 83]]
2018-08-15 19:55:07 FieldTable - FieldTable::writeToBuffer: Writing encoded length of 308...
2018-08-15 19:55:07 FieldTable - {product=[LONG_STRING: qpid], version=[LONG_STRING: 6.1.1], qpid.build=[LONG_STRING: 1775107], qpid.instance_name=[LONG_STRING: Embedded Test Broker], qpid.close_when_no_route=[LONG_STRING: true], qpid.message_compression_supported=[LONG_STRING: true], qpid.confirmed_publish_supported=[LONG_STRING: true], qpid.virtualhost_properties_supported=[LONG_STRING: true], qpid.queue_lifetime_supported=[LONG_STRING: true]}
2018-08-15 19:55:07 NonBlockingConnection - Written 379 bytes
2018-08-15 19:55:07 NonBlockingConnection - Read 0 byte(s)
2018-08-15 19:55:07 NonBlockingConnection - Read 443 byte(s)
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionStartOk[ clientProperties: {connection_name=[LONG_STRING: rabbitConnectionFactory#1800efd:1], product=[LONG_STRING: RabbitMQ], copyright=[LONG_STRING: Copyright (c) 2007-2017 Pivotal Software, Inc.], capabilities=[FIELD_TABLE: {exchange_exchange_bindings=[BOOLEAN: true], connection.blocked=[BOOLEAN: true], authentication_failure_close=[BOOLEAN: true], basic.nack=[BOOLEAN: true], publisher_confirms=[BOOLEAN: true], consumer_cancel_notify=[BOOLEAN: true]}], information=[LONG_STRING: Licensed under the MPL. See http://www.rabbitmq.com/], version=[LONG_STRING: 5.1.2], platform=[LONG_STRING: Java]} mechanism: PLAIN response: ******** locale: en_US ]
2018-08-15 19:55:07 AMQPConnection_0_8Impl - SASL Mechanism selected: PLAIN Locale : en_US
2018-08-15 19:55:07 AMQPConnection_0_8Impl - Connected as: Subject:
    Principal: guest

2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionTuneBodyImpl: channelMax=256, frameMax=262136, heartbeat=0]
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 1 ms.
2018-08-15 19:55:07 NonBlockingConnection - Written 20 bytes
2018-08-15 19:55:07 NonBlockingConnection - Read 0 byte(s)
2018-08-15 19:55:07 NonBlockingConnection - Read 36 byte(s)
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionTuneOk[ channelMax: 256 frameMax: 262136 heartbeat: 60 ]
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 0 ms.
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionOpen[ virtualHost: / capabilities: null insist: false ]
2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionCloseBodyImpl: replyCode=404, replyText=Unknown virtual host: '/', classId=10, methodId=40]
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 0 ms.
2018-08-15 19:55:07 NonBlockingConnection - Written 44 bytes
2018-08-15 19:55:07 NonBlockingConnection - Read 0 byte(s)
2018-08-15 19:55:07 NonBlockingConnection - Read 12 byte(s)
2018-08-15 19:55:07 RequestServiceImpl - Failure sending message to queue...storing for later delivery {java.io.IOException}
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionCloseOk
2018-08-15 19:55:07 NonBlockingConnection - Closing /0:0:0:0:0:0:0:1:56892
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 0 ms.
2018-08-15 19:55:07 MultiVersionProtocolEngine - Closed
2018-08-15 19:55:07 ForgivingExceptionHandler - An unexpected connection driver error occured
java.net.SocketException: socket closed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.io.BufferedInputStream.fill(Unknown Source)
    at java.io.BufferedInputStream.read(Unknown Source)
    at java.io.DataInputStream.readUnsignedByte(Unknown Source)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580)
    at java.lang.Thread.run(Unknown Source)
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable org.apache.qpid.server.transport.AbstractAMQPConnection$1@c30ce0 through executor interface
2018-08-15 19:55:07 close - [con:3(guest@null)] CON-1002 : Close
2018-08-15 19:55:20 BrokerImpl - Assigning target sizes based on total target 207591833
2018-08-15 19:55:20 BrokerImpl - Assigning target size 207591833 to vhost VirtualHost[id=556d86e7-d4f7-4428-9a54-86b280bdd515, name=/, type=Memory]
2018-08-15 19:55:20 AbstractVirtualHost - Allocating target size to queues, total target: 207591833 ; total enqueued size 0
spring-amqp qpid
2个回答
1
投票

如果您需要模仿RabbitMQ行为,我建议使用rabbitmq-mock来避免端口的需要(这可能是CI运行中的问题)并缩短启动时间。

如果您使用Maven,只需将依赖项添加到构建工具:

<dependency>
    <groupId>com.github.fridujo</groupId>
    <artifactId>rabbitmq-mock</artifactId>
    <version>1.0.10</version>
    <scope>test</scope>
</dependency>

然后替换或覆盖您的Spring ConnectionFactory

@Bean
public ConnectionFactory connectionFactory() {
    return new CachingConnectionFactory(MockConnectionFactoryFactory.build());
}

RabbitTemplateSimpleMessageListenerContainer都应该表现得好像是连接到真正的RabbitMQ经纪人。

这是一个complete sample test using Spring-Boot


0
投票

您的AMQP客户端正在请求Broker上不存在的虚拟主机。经纪人回复“未知的虚拟主机”。检查客户端传递的连接详细信息。

2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionOpen[ virtualHost: / capabilities: null insist: false ] 2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionCloseBodyImpl: replyCode=404, replyText=Unknown virtual host: '/', classId=10, methodId=40]}}

另外,我注意到您使用的是相当旧的Broker版本(6.1.1),我建议您选择更新的版本。

© www.soinside.com 2019 - 2024. All rights reserved.