Apache ActiveMq Artemis客户端重新连接到群集HA复制/共享数据存储中的下一个可用代理

问题描述 投票:0回答:2

Broker.xml(host1)和host2,只是端口号更改为61616,而从属端口作为配置。参考Apache Artemis client fail over discovery

<?xml version="1.0" encoding="UTF-8" standalone="no"?>

<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
   <core xmlns="urn:activemq:core">

      <bindings-directory>./data/bindings</bindings-directory>

      <journal-directory>./data/journal</journal-directory>

      <large-messages-directory>./data/largemessages</large-messages-directory>

      <paging-directory>./data/paging</paging-directory>
      <!-- Connectors -->

      <connectors>
         <connector name="netty-connector">tcp://10.64.60.100:61617</connector><!-- direct ip addres of host myhost1 -->
         <connector name="broker2-connector">tcp://myhost2:61616</connector> <!-- ip 10.64.60.101 <- mocked up ip for security reasons -->
      </connectors>

      <!-- Acceptors -->
      <acceptors>
         <acceptor name="amqp">tcp://0.0.0.0:61617?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true</acceptor>
      </acceptors>

      <ha-policy>
       <replication>
          <master/>
       </replication>
    </ha-policy>

      <cluster-connections>
         <cluster-connection name="myhost1-cluster">
            <connector-ref>netty-connector</connector-ref>
            <retry-interval>500</retry-interval>
            <use-duplicate-detection>true</use-duplicate-detection>
            <message-load-balancing>ON_DEMAND</message-load-balancing>
            <max-hops>1</max-hops>
              <static-connectors>
             <connector-ref>broker2-connector</connector-ref> <!-- defined in the connectors -->
            </static-connectors>
         </cluster-connection>
      </cluster-connections>

      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
             <auto-delete-queues>false</auto-delete-queues>
            <auto-delete-created-queues>false</auto-delete-created-queues>
            <auto-delete-addresses>false</auto-delete-addresses>
        </address-setting>
      </address-settings>
   </core>
</configuration>

使用Java和生产者模板的客户端将消息推送到直接端点,然后路由到队列产生消息:

Java-camel-producer-client

    package com.demo.artemis;

    import org.apache.camel.CamelContext;
    import org.apache.camel.ProducerTemplate;
    import org.apache.camel.spring.SpringCamelContext;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;

    public class ToRoute {

         public static void main(String[] args) throws Exception { 
                ApplicationContext appContext = new ClassPathXmlApplicationContext(
                        "camel-context-producer.xml");
                ProducerTemplate template = null;
                CamelContext camelContext = SpringCamelContext.springCamelContext(
                        appContext, false);
                try {            
                    camelContext.start();
                     template = camelContext.createProducerTemplate();
                    String msg = null;
                    int loop =0;
                    int i = 0;
                    while(true) {
                        if (i%10000==0) {
                            i=1;
                            loop=loop+1;
                        }
                        if(loop==2) break;
                        msg ="---> "+(++i);             
                        template.sendBody("direct:toDWQueue", msg);
                    }
                } finally {
                    if (template != null) {
                        template.stop();
                    }
                    camelContext.stop();
                }
         }
    }

骆驼上下文将消息发送到队列:说camel-producer-client

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:jee="http://www.springframework.org/schema/jee" 
    xmlns:tx="http://www.springframework.org/schema/tx" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.1.xsd
        http://www.springframework.org/schema/jee
        http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
        http://camel.apache.org/schema/spring
        http://camel.apache.org/schema/spring/camel-spring.xsd">
     <bean id="jmsConnectionFactory" class="org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory">
      <constructor-arg index="0" value="(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;"/>
    </bean>

    <bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
      <property name="maxConnections" value="10" />
      <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
      <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
      <property name="concurrentConsumers" value="5" />
    </bean>

    <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
      <property name="configuration" ref="jmsConfig" />
      <property name="streamMessageTypeEnabled" value="true"/>
    </bean>

        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
      <endpoint id="myqueue" uri="jms:queue:myExampleQueue" />

      <route>
        <from uri="direct:toMyExample"/>
       <transform>
      <simple>MSG FRM DIRECT TO MyExampleQueue : ${bodyAs(String)}</simple>
        </transform>
        <to uri="ref:myqueue"/>
      </route>
    </camelContext>

  </beans>

骆驼使用者从MyExampleQueue中读取并转换为OutboundQueue :(此上下文文件使用骆驼“ org.apache.camel.spring.Main –accamel-consumer-client

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:jee="http://www.springframework.org/schema/jee" 
    xmlns:tx="http://www.springframework.org/schema/tx" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.1.xsd
        http://www.springframework.org/schema/jee
        http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
        http://camel.apache.org/schema/spring
        http://camel.apache.org/schema/spring/camel-spring.xsd">

        <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
            <property name="environment">
                <props>
                    <prop key="java.naming.factory.initial">org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory</prop>
                                <prop key="connectionFactory.ConnectionFactory">(tcp://myhost1:61617,tcp://myhost2:61616)?ha=true;retryInterval=1000;retryIntervalMultiplier=1.0;reconnectAttempts=-1;</prop>
                                  <prop key="queue.queue/MyExampleQueue">MyExampleQueue</prop>
                    <prop key="queue.queue/OutBoundQueue">OutBoundQueue</prop>
                </props>
            </property>
        </bean>
           <bean id="jndiFactoryBean" class="org.springframework.jndi.JndiObjectFactoryBean">
            <property name="jndiName" value="ConnectionFactory"/> 
            <property name="jndiTemplate" ref="jndiTemplate"/>
              <property name="cache" value="true"/>     
        </bean> 

        <bean id="jndiDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
            <property name="jndiTemplate" ref="jndiTemplate"/>
              <property name="cache" value="true"/>
    <!-- dynamic destination if the destination name  is not found in JNDI -->
             <property name="fallbackToDynamicDestination" value="true"/>
        </bean>

       <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
            <property name="connectionFactory" ref="jndiFactoryBean"/>
            <property name="destinationResolver" ref="jndiDestinationResolver"/>
                 <property name="concurrentConsumers" value="10" />
        </bean>

        <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
               <property name="configuration" ref="jmsConfig" />
        </bean>

        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
      <endpoint id="inqueue" uri="jms:queue:MyExampleQueue" />
      <endpoint id="outqueue" uri="jms:queue:OutBoundQueue" />

      <route>
        <from uri="ref:inqueue" />
        <convertBodyTo type="java.lang.String" />
        <transform>
          <simple>MSG FRM MyExampleQueue TO OutboundQueue : ${bodyAs(String)}</simple>
        </transform>
        <to uri="ref:outqueue" />
      </route>
    </camelContext>
  </beans>

我认为我面临的问题与这张票相似http://activemq.2283324.n4.nabble.com/Connection-to-the-backup-after-master-shutdown-td4753770.html

[使用Camel上下文时,我看到客户端使用者在主服务器不可用时被重定向到从服务器(我正在将JNDI与JMSPoolConnectionFactory一起使用。)>]

Java-producer-client使用生产者模板发送消息以定向端点,端点被转换并路由到队列。当主服务器关闭时,客户端将无法故障转移并连接到从服务器。 (问题:当代理处于HA模式时,这是预期的行为)。

尝试重新连接后,当主机关闭时,我看到以下异常。

Caused by: javax.jms.JMSException: AMQ219016: Connection failure detected. Unblocking a blocking call that will never get a response

另一方面,使用主设备启动的camel-context-consumer能够自动故障转移到从设备。在控制台中,我确实注意到在从属主机和处理数据中有10个使用者计数。但是当主服务器备份时,客户端没有切换到活动的主节点。问题:这也是预期的吗?

用于创建以下约定的连接工厂。

(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;

问题:即使使用静态连接器,我们是否也需要配置广播和发现组?

Broker.xml(host1)和host2,只是端口号更改为61616,而从属端口作为配置。参考Apache Artemis客户端故障转移发现

java apache-camel activemq-artemis
2个回答
1
投票

错误说明“取消阻止永远不会得到响应的阻止调用”的错误


0
投票

在生产者中处理了JMSException之后(也能够成功进行故障转移)

© www.soinside.com 2019 - 2024. All rights reserved.