线程轮询jdbc:inbound-channel-adapter回滚事务,但发送了MQ消息

问题描述 投票:0回答:1

我正在使用jdbc:inbound-channel-adapter作为transactional来轮询50 DB记录。这意味着所有50条记录将在一次(相同)交易中处理。我希望在同一笔交易中对它们进行处理。

在示例代码中,ProcessIndividualRecord :: updateDB()更新数据库中的状态,然后notificationJMSChannel将消息发送到MQ。

如果在处理第50条记录updateDB方法中发生异常,则框架回滚所有先前更新的49记录,并且控制将<ProcessorExceptionHandler记录。

我的挑战是“

notificationJMSChannel”已将先前的49记录发送给MQ。

我该如何回滚那些?

<int-jdbc:inbound-channel-adapter id="initial.poller" query="${poller.get}" update="${poller.update}" max-rows="${poller.maxRow:50}" row-mapper="pollerRowMapper" data-source="dataSource" channel="deliveryContactTypeChannel"> <int:transactional/> </int-jdbc:inbound-channel-adapter> <int:channel id="deliveryContactTypeChannel" /> <int:splitter id="splitDeliveryContactType" ref="deliveryContactTypeMessageProcessor" method="handleJdbcMessage" input-channel="deliveryContactTypeChannel" output-channel="processIndividualRecordChannel" /> <int:service-activator id="statusUpdate" input-channel="processIndividualRecordChannel" output-channel="notificationJMSChannel" ref="processIndividualRecord" method="updateDB" /> <!-- send MQ message --> <int:channel id="notificationJMSChannel"></int:channel> <int-jms:outbound-channel-adapter id="jmsOut" channel="notificationJMSChannel" destination="senderTopic" jms-template="jmsQueueTemplate"> </int-jms:outbound-channel-adapter> <!-- Error handling --> <int:channel id="jmsErrorChannel" /> <int:header-enricher id="errorMsg.HeaderEnricher" input-channel="errorChannel" output-channel="jmsErrorChannel"> <int:header name="failpayload" expression="payload.failedMessage" /> </int:header-enricher> <int:service-activator input-channel="jmsErrorChannel" method="handleException"> <bean class="com.digital.alerts.integration.ProcessorExceptionHandler" /> </int:service-activator>

spring-integration
1个回答
0
投票
如果您使用JEE容器或

chain DataSource和JMS事务管理器,则需要考虑使用JtaTransactionManagerhttps://www.javaworld.com/article/2077963/distributed-transactions-in-spring--with-and-without-xa.html

请参见Spring Data中的ChainedTransactionManager实现:

/** * {@link PlatformTransactionManager} implementation that orchestrates transaction creation, commits and rollbacks to a * list of delegates. Using this implementation assumes that errors causing a transaction rollback will usually happen * before the transaction completion or during the commit of the most inner {@link PlatformTransactionManager}. * <p /> * The configured instances will start transactions in the order given and commit/rollback in <em>reverse</em> order, * which means the {@link PlatformTransactionManager} most likely to break the transaction should be the <em>last</em> * in the list configured. A {@link PlatformTransactionManager} throwing an exception during commit will automatically * cause the remaining transaction managers to roll back instead of committing. * * @author Michael Hunger * @author Oliver Gierke * @since 1.6 */ public class ChainedTransactionManager implements PlatformTransactionManager {

现在,看起来每条消息都是在其自己的JMS本地事务中发送的。而且这完全不知道您有外部JDBC事务。
© www.soinside.com 2019 - 2024. All rights reserved.