我正在使用jdbc:inbound-channel-adapter作为transactional来轮询50 DB记录。这意味着所有50条记录将在一次(相同)交易中处理。我希望在同一笔交易中对它们进行处理。
在示例代码中,ProcessIndividualRecord :: updateDB()更新数据库中的状态,然后notificationJMSChannel将消息发送到MQ。
如果在处理第50条记录时updateDB方法中发生异常,则框架回滚所有先前更新的49记录,并且控制将<ProcessorExceptionHandler记录。
我的挑战是“notificationJMSChannel”已将先前的49记录发送给MQ。
我该如何回滚那些?<int-jdbc:inbound-channel-adapter
id="initial.poller"
query="${poller.get}"
update="${poller.update}"
max-rows="${poller.maxRow:50}"
row-mapper="pollerRowMapper"
data-source="dataSource" channel="deliveryContactTypeChannel">
<int:transactional/>
</int-jdbc:inbound-channel-adapter>
<int:channel id="deliveryContactTypeChannel" />
<int:splitter id="splitDeliveryContactType"
ref="deliveryContactTypeMessageProcessor" method="handleJdbcMessage"
input-channel="deliveryContactTypeChannel"
output-channel="processIndividualRecordChannel" />
<int:service-activator id="statusUpdate"
input-channel="processIndividualRecordChannel"
output-channel="notificationJMSChannel"
ref="processIndividualRecord" method="updateDB" />
<!-- send MQ message -->
<int:channel id="notificationJMSChannel"></int:channel>
<int-jms:outbound-channel-adapter
id="jmsOut" channel="notificationJMSChannel"
destination="senderTopic" jms-template="jmsQueueTemplate">
</int-jms:outbound-channel-adapter>
<!-- Error handling -->
<int:channel id="jmsErrorChannel" />
<int:header-enricher id="errorMsg.HeaderEnricher"
input-channel="errorChannel"
output-channel="jmsErrorChannel">
<int:header name="failpayload" expression="payload.failedMessage" />
</int:header-enricher>
<int:service-activator
input-channel="jmsErrorChannel" method="handleException">
<bean class="com.digital.alerts.integration.ProcessorExceptionHandler" />
</int:service-activator>
chain DataSource和JMS事务管理器,则需要考虑使用JtaTransactionManager
:https://www.javaworld.com/article/2077963/distributed-transactions-in-spring--with-and-without-xa.html。
ChainedTransactionManager
实现:/**
* {@link PlatformTransactionManager} implementation that orchestrates transaction creation, commits and rollbacks to a
* list of delegates. Using this implementation assumes that errors causing a transaction rollback will usually happen
* before the transaction completion or during the commit of the most inner {@link PlatformTransactionManager}.
* <p />
* The configured instances will start transactions in the order given and commit/rollback in <em>reverse</em> order,
* which means the {@link PlatformTransactionManager} most likely to break the transaction should be the <em>last</em>
* in the list configured. A {@link PlatformTransactionManager} throwing an exception during commit will automatically
* cause the remaining transaction managers to roll back instead of committing.
*
* @author Michael Hunger
* @author Oliver Gierke
* @since 1.6
*/
public class ChainedTransactionManager implements PlatformTransactionManager {
现在,看起来每条消息都是在其自己的JMS本地事务中发送的。而且这完全不知道您有外部JDBC事务。