Camel Spring 使用 ActiveMQ Artemis Proton/Qpid 客户端进行远程处理 - 挂起发送消息

问题描述 投票:0回答:1
  • 当我使用 Camel Spring 远程处理配置发送一些消息时,生产者和消费者都在不同的 JVM 中运行。
    • 使用 Apache ActiveMQ Artemis 2.14.0 版本
    • camel 版本 (2.20.0)、qpid (0.54.0)、pooled-jms (1.1.1)

我使用 LoadMessageSupport 类来推送消息,我看到骆驼路由被调用,并且位于调试日志消息下方。

我注意到 ActiveMQ Artemis 控制台中启用了生产者会话。

任何线索,如何调试此问题或可能导致此问题的原因。

有一些 Netty 相关的调试错误,我安全地忽略了。

...
DEBUG [main] (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=camel,type=components,name="bean"
DEBUG [main] (DefaultComponent.java:266) - Cannot resolve property placeholders on component: org.apache.camel.component.bean.BeanComponent@cda0432 as PropertiesComponent is not in use
DEBUG [main] (AbstractAutowireCapableBeanFactory.java:448) - Creating instance of bean 'org.apache.camel.component.jackson.converter.JacksonTypeConverters'
DEBUG [main] (AbstractAutowireCapableBeanFactory.java:484) - Finished creating instance of bean 'org.apache.camel.component.jackson.converter.JacksonTypeConverters'
 INFO [main] (CamelLogger.java:159) - ID-local-vm-1624040900482-0-1 >>> (processMessage) from(direct://proxy-msg-handler) --> log[Log message on incoming message with body] <<< Pattern:InOnly, Headers:{breadcrumbId=ID-local-vm-1624040900482-0-1}, BodyType:org.apache.camel.component.bean.BeanInvocation, Body:BeanInvocation public abstract void com.myexample.MessageHandler.processMessage(com.myexample.MessageType,java.lang.String) with [ITEM_DESCRIPTION, {"info": " my name"}]]
DEBUG [main] (CamelLogger.java:153) - Log message on incoming message with body
 INFO [main] (CamelLogger.java:159) - ID-local-vm-1624040900482-0-1 >>> (SubmitNotificationEvent) log[Log message on incoming message with body] --> amqpcomponent://queue:message.queue <<< Pattern:InOnly, Headers:{breadcrumbId=ID-local-vm-1624040900482-0-1}, BodyType:org.apache.camel.component.bean.BeanInvocation, Body:BeanInvocation public abstract void com.myexample.MessageHandler.processMessage(com.myexample.MessageType,java.lang.String) with [ITEM_DESCRIPTION, {"info": " my name"}]]
DEBUG [main] (SendProcessor.java:147) - >>>> service-event-queue://queue:message.queue Exchange[ID-local-vm-1624040900482-0-1]
DEBUG [main] (InternalLoggerFactory.java:45) - Using SLF4J as the default logging framework
...
DEBUG [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpConnectionBuilder.java:84) - AmqpConnection { ID:6d0c8673-6a92-401d-a239-12ec696fc9d3:1 } is now open: 
 INFO [AmqpProvider :(1):[amqp://localhost:5672]] (JmsConnection.java:1339) - Connection ID:6d0c8673-6a92-401d-a239-12ec696fc9d3:1 connected to server: amqp://localhost:5672
DEBUG [main] (JmsTemplate.java:492) - Executing callback on JMS Session: JmsPoolSession { org.apache.qpid.jms.JmsSession@7fd26ad8 }
DEBUG [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProducerBuilder.java:68) - Creating AmqpFixedProducer for: null
DEBUG [main] (JmsConfiguration.java:622) - Sending JMS message to: message.queue with message: JmsObjectMessageFacade 

启用 TRACE 级别日志后,注意到下面的消息

DEBUG [main] (JmsConfiguration.java:622) - Sending JMS message to: message.queue with message: JmsObjectMessageFacade { org.apache.qpid.jms.provider.amqp.message.AmqpJmsObjectMessageFacade@36cc9385 }
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpFixedProducer.java:100) - Holding Message send until credit is available.
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:560) - New incoming data read: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 65536)
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:49) - [1673389762:0] RECV: Empty Frame
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:54) - [1673389762:0] SENT: Empty Frame
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:259) - Attempted write of buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 8/8)
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:273) - Attempted flush of pending writes
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:560) - New incoming data read: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 65536)
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:49) - [1673389762:0] RECV: Empty Frame
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:54) - [1673389762:0] SENT: Empty Frame
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:259) - Attempted write of buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 8/8)
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:273) - Attempted flush of pending writes
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
  • 下面是我用来从 java 类发送消息的上下文 xml。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
         http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
         http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
    http://www.springframework.org/schema/util  ttp://www.springframework.org/schema/util/spring-util.xsd">

  <bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
     <property name="remoteURI" value="amqp://localhost:5672?amqp.traceFrames=true"/>
  </bean>

  <bean id="jpcf" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop" >
    <property name="maxConnections" value="3" />
    <property name="connectionFactory" ref="jmsConnectionFactory" />
  </bean>

  <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="jpcf" />
    <property name="concurrentConsumers" value="3" />
  </bean>

  <bean id="amqpcomponent" class="org.apache.camel.component.amqp.AMQPComponent">
    <property name="configuration" ref="jmsConfig" />
  </bean>
  
  
    <!-- Camel Spring Remoting Interface -->
    <camel:proxy id="proxyObject" binding="false" serviceUrl="direct:proxy-msg-handler" serviceInterface="com.myexample.MessageHandler"/>       

    <!-- Bean that initialize the Spring Remoting for handling message -->
    <bean id="BeanProxy" class="com.myexample.MessageProducer">
        <property name="messageHandler" ref="proxyObject"/>
    </bean>
    
    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring" autoStartup="true" trace="true">
        
        <camel:route autoStartup="true" id="processMessage">
            <camel:from uri="direct:proxy-msg-handler"/>
            <camel:log message="Log incoming message" logName="Incoming" loggingLevel="DEBUG"/>
             <camel:inOnly uri="amqpcomponent:queue:message.queue"/>
        </camel:route>
    </camelContext>
</beans>
  • 运行上下文的java类,用于调用远程spring bean方法。
    • 使用下面的java类将消息推送到ActiveMQ Artemis队列
package com.myexample;

public class LoadMessageSupport {

    public static void main(String ...strings) {
        ApplicationContext appContext =null;
        try {
            appContext = new ClassPathXmlApplicationContext("file:/paht/to/context/message-handler-context.xml");
            MessageProducer messageProducer = appContext.getBean(MessageProducer.class);
             message = "{ \"itemDesc\" : \"test description\" }" ;
            System.out.println(message);
            messageProducer.sendMessage(MessageType.ITEM_DESC, message); 
           // enum messagetype already defined within project

            //System.exit(0);

        }catch(Exception exe) {
            System.out.println("Something wrong... ");
            exe.printStackTrace();
        }finally {
            if(camelContext!=null) {
                System.out.println("camel context stopped...");
                camelContext.stop();
            }
        }
    }
}
  • 消息接收者类

@InOnly
public interface MessageHandler{
   public processMessage(MessageType type, Order order);
   public processMessage(MessageType type, String message); // trying to invoke this message
}
  • 制作人班

public class MessageProducer{

  ​// using the proxy object within the producer object 
  ​// this will invoke the spring bean using remote (rmi)
  ​private MessageHandler messageHandler;

  protected MessageHandler getMessageHandler() {
    return this.messageHandler;
   }

   public void setMessageHandler(MessageHandler messageHandler) {
    this.messageHandler = messageHandler;
    }

  //constructor 
  public ​MessageProducer() {}
  ​
  public void sendMessage(MessageType type, Order order ){
    ​getMessageHandler().processMessage(type,order);
  ​}

  ​public void sendMessage(MessageType type, String message ){
    ​getMessageHandler().processMessage(type,message);
  ​}
  • 消息接收者

public class MessageReceiver implements MessageHandler {
 
  @Handler 
  public void processMessage(MessageType type, Order order){
   System.out.println(" received type and ORDER info ...");
   // invoke methods for logical processing
  }

  @Handler
  public void processMessage(MessageType type, String message){
   System.out.println(" received type and MESSAGE info for procesing...");
   // invoke methods for logical processing
  }  
}
spring apache-camel amqp activemq-artemis qpid
1个回答
0
投票

当我之前尝试时,似乎我的虚拟机没有足够的内存。

free -h
表示仅剩下 500MB。

重新启动虚拟机后,消息现在发送到 Artemis borker。

© www.soinside.com 2019 - 2024. All rights reserved.