Apache TomEE外部ActiveMQ资源未在分布式事务中回滚

问题描述 投票:1回答:1

我正在尝试在Apache TomEE中实现分布式事务。换句话说,流程是:

  • 消息读取器(即消息驱动的Bean)从队列(1)中读取并处理一个消息触发:
    • 一个或多个数据库插入/更新(2)
    • 将消息发送到另一个队列(3)

操作1、2和3都是TomEE控制的同一XA事务的一部分。因此,在任何情况下,它们要么全部失败,要么全部成功。

tomee.xml

<?xml version="1.0" encoding="UTF-8"?>
<tomee>
     this resource adapter is just necessary to tell tomee to not start internal ActiveMq instance
    <Resource id="MyAdapter" type="ActiveMQResourceAdapter">
        BrokerXmlConfig
        ServerUrl tcp://fakehost:666 
    </Resource> 

     <Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
        BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0 
    </Resource>

    <Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
        BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
    </Resource>

    <Resource id="jms/MyOutgoingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
        PhysicalName MY_OUTGOING_QUEUE 
    </Resource>

    <Resource id="jms/MyIncomingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
        PhysicalName MY_INCOMING_QUEUE 
    </Resource>

    <Resource id="jdbc/myDBXAPooled" type="DataSource">
        XaDataSource myDBXA
        DataSourceCreator dbcp
        JtaManaged true
        UserName TestUser
        Password TestPassword
        MaxWait 2000
        ValidationQuery SELECT 1    
        MaxActive 15
    </Resource> 

    <Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
        Url jdbc:mysql://localhost:3306/test
        User TestUser
        Password TestPassword
    </Resource>
</tomee>

Springconfig.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jee="http://www.springframework.org/schema/jee"
            xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">


    <!-- <jee:jndi-lookup jndi-name="myDBXAPooled" id="myDatasource" resource-ref="true" />  -->
    <jee:jndi-lookup jndi-name="jms/MyOutgoingConnFactory" id="myOutgoingConnFactory" resource-ref="true" />
    <jee:jndi-lookup jndi-name="jms/MyIncomingConnFactory" id="myIncomingConnFactory" resource-ref="true" />  
    <jee:jndi-lookup jndi-name="jms/MyOutgoingQueue" id="myOutgoingQueue" resource-ref="true" />
    <jee:jndi-lookup jndi-name="jms/MyIncomingQueue" id="myIncomingQueue" resource-ref="true" />
    <jee:jndi-lookup jndi-name="jdbc/myDBXAPooled" id="myDatasource" resource-ref="true" />

    <tx:jta-transaction-manager/>
    <!-- <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> -->
    <!-- the previous two ways of getting the transactionManager seems equivalent and both get Geronimo -->


</beans>

SpringConfig.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jee="http://www.springframework.org/schema/jee"
            xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">


    <bean id="messageListener" class="com.test.MyListener">
        <property name="connectionFactory" ref="myIncomingConnFactory" />
        <property name="destination" ref="myIncomingQueue" />
        <!-- <property name="sessionTransacted" value="true" /> -->
        <property name="concurrentConsumers" value="1" />
        <property name="maxConcurrentConsumers" value="6" />
        <property name="messageListener" ref="myMessageProcessor" />
        <property name="transactionManager" ref="transactionManager" />
        <property name="taskExecutor" ref="msgListenersTaskExecutor" />
    </bean>

    <bean id="myMessageProcessor" class="com.test.MyMessageReceiver">
        <property name="forwardConnectionFactory" ref="myOutgoingConnFactory" />
        <property name="forwardQueue" ref="myOutgoingQueue" />
        <property name="datasource" ref="myDatasource" />
    </bean>

    <bean id="msgListenersTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"/>



</beans>

MyMessageReceiver.java:

package com.test;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;

import org.apache.log4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

public class MyMessageReceiver implements MessageListener {

    static Logger log = Logger.getLogger(MyMessageReceiver.class);

    private ConnectionFactory forwardConnectionFactory;
    private Queue forwardQueue;
    private DataSource datasource;

    public void setForwardConnectionFactory(ConnectionFactory connFactory) {
        forwardConnectionFactory=connFactory;
    }
    public void setforwardQueue(Queue queue) {
        forwardQueue=queue;
    }
    public void setDatasource(DataSource ds) {
        datasource=ds;
    }

    @Override
    @Transactional(propagation=Propagation.REQUIRED)
    public void onMessage(Message message) {

        log.info("************************************");
        MyListener listener = (MyListener)SpringContext.getBean("messageListener");
        listener.printInfo();
        log.info("************************************");

        TextMessage msg = (TextMessage) message;
        String text = null;
        try {
            text = msg.getText();

            if (text != null) log.info("MESSAGE RECEIVED: "+ text);

            updateDB(text); // function call to update DB

            sendMsg(text);   // function call to publish messages to queue

           System.out.println("****************Rollback");
            // Throwing exception to rollback DB, Message should not be 
             // published and consumed message sent to a DLQ 
             //(Broker side DLQ configuration already done) 
        throw new RuntimeException();
            //if (text!=null && text.indexOf("rollback")!=-1) throw new RuntimeException("Message content includes the word rollback");

        } catch (Exception e) {
            log.error("Rolling back the entire XA transaction");
            log.error(e.getMessage());
            throw new RuntimeException("Rolled back because of "+e.getMessage());
        }

    }

    private void updateDB(String text) throws Exception {

        Connection conn = null;
        PreparedStatement ps = null;
        try {
            System.out.println("*******datasource "+datasource);
            conn = datasource.getConnection();
            System.out.println("*******conn "+conn.getMetaData().getUserName());
            if (conn!=null) {
                System.out.println("*******conn "+conn.getMetaData().getUserName());
                ps = conn.prepareStatement("INSERT INTO MY_TABLE (name) VALUES(?)");
                ps.setString(1, text);
                ps.executeUpdate();
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (ps!=null) {
                try {
                    ps.close();
                } catch (SQLException e) {
                    log.error(e.getMessage());
                    // do nothing
                }
            }
            if (conn!=null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    log.error(e.getMessage());
                    // do nothing
                }
            }
        }

    }

    private void sendMsg(String msgToBeSent) throws Exception {

        javax.jms.Connection conn = null;
        Session session = null;
        try {
            System.out.println("*************forwardConnectionFactory"+forwardConnectionFactory);
            conn = forwardConnectionFactory.createConnection();
            session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            MessageProducer messageProducer = session.createProducer(forwardQueue);
            TextMessage msg = session.createTextMessage(msgToBeSent);
            messageProducer.send(msg);

        } catch (Exception e) {
            throw e;
        } finally {
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    // do nothing
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    // do nothing
                }
            }
        }
    }

}

MyListener.java:

package com.test;

import javax.transaction.Status;
import javax.transaction.SystemException;

import org.apache.log4j.Logger;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.jta.JtaTransactionManager;

public class MyListener extends DefaultMessageListenerContainer {

    static Logger log = Logger.getLogger(MyListener.class);

    public void printInfo() {

        try {

            log.info("trans manager="+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager()+","+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().getStatus()+", this.isSessionTransacted()="+this.isSessionTransacted());
            log.info("STATUS_ACTIVE="+Status.STATUS_ACTIVE);
            log.info("STATUS_COMMITTEDE="+Status.STATUS_COMMITTED);
            log.info("STATUS_COMMITTING="+Status.STATUS_COMMITTING);
            log.info("STATUS_MARKED_ROLLBACK="+Status.STATUS_MARKED_ROLLBACK);
            log.info("STATUS_NO_TRANSACTION="+Status.STATUS_NO_TRANSACTION);
            log.info("STATUS_PREPARED="+Status.STATUS_PREPARED);
            log.info("STATUS_PREPARING="+Status.STATUS_PREPARING);
            log.info("STATUS_ROLLEDBACK="+Status.STATUS_ROLLEDBACK);
            log.info("STATUS_ROLLING_BACK="+Status.STATUS_ROLLING_BACK);
            log.info("STATUS_UNKNOWN="+Status.STATUS_UNKNOWN);



        } catch (SystemException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    public void forceRollback() {
        try {
            ((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().setRollbackOnly();
        } catch (IllegalStateException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SecurityException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SystemException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

更新数据库并将消息发送到传出队列后,我故意抛出RuntimeException只是为了测试数据库和消息代理的事务回滚。

[三个操作在成功的情况下都被提交,但是只有在失败的情况下才回滚数据库操作,而两个JMS操作仍然都被提交。

可能是:

  • 我的tomee.xml(尤其是队列连接工厂)或其他地方的设置错误
  • 一个错误?

我已经花了很多时间来解决问题并寻找可能的解决方案。

很高兴听到您对此的意见,如果事实证明这是我的错,再次表示歉意。

activemq tomee tomee-7 apache-tomee tomee-8
1个回答
0
投票

我相信您需要使用ActiveMQ JCA资源适配器来确保将连接自动注册到XA事务中。试试这个:

<tomee>
    <Resource id="MyJmsResourceAdapter" type="ActiveMQResourceAdapter">
        # Do not start the embedded ActiveMQ broker
        BrokerXmlConfig  =
        ServerUrl = tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
    </Resource>

    <Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory">
        resourceAdapter = MyJmsResourceAdapter
        transactionSupport = xa
    </Resource>

    <Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory">
        resourceAdapter = MyJmsResourceAdapter
        transactionSupport = xa
    </Resource>

    <Resource id="jms/MyOutgoingQueue" type="javax.jms.Queue"/>
    <Resource id="jms/MyIncomingQueue" type="javax.jms.Queue"/>

    <Resource id="jdbc/myDBXAPooled" type="DataSource">
        XaDataSource myDBXA
        DataSourceCreator dbcp
        JtaManaged true
        UserName TestUser
        Password TestPassword
        MaxWait 2000
        ValidationQuery SELECT 1    
        MaxActive 15
    </Resource> 

    <Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
        Url jdbc:mysql://localhost:3306/test
        User TestUser
        Password TestPassword
    </Resource>
</tomee>
© www.soinside.com 2019 - 2024. All rights reserved.