我之前创建了一个集成流程,该流程使用带有 updateExpression 的 MongoDb inboundChannelAdapter 并将其设置为事务性:
IntegrationFlow
.from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}")
.collectionName("work").entityClass(Document.class)
.update(Update.update("status", "P")),
p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))
这个轮询器将获取文档列表,对它们进行写锁定,对它们执行
split()
。然后开始加工。如果一切顺利,事务将在流程结束时提交,其中 outboundGateway 将记录设置为“完成”。
事务和 updateExpression() 的目的是防止其他轮询线程获取相同的记录并处理它们(如果它们已经被处理)。这是可行的,但如果该列表中的任何单个文档已经具有写锁,则整个轮询将失败,因为它是整个列表上的一个事务。
我的代码的下一次迭代是尝试在每个单独文档的 split() 之后获取事务。这样,如果仅对一个文档的写锁定尝试失败,它就可以转到列表中的下一个文档并尝试对该文档进行写锁定。更进一步,每个单独的文档可以在单独的线程上并行处理,每个线程都有自己的事务。我的尝试看起来像这样:
@Bean
public IntegrationFlow mongoFlowTxnPerDoc(MongoTemplate mongoTemplate, TransactionManager tm) {
return IntegrationFlow
.from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'TREADY'}")
.collectionName("work").entityClass(Document.class),
p -> p.poller(pm -> pm.fixedRate(1000L)))
.split()
.channel(c -> c.executor("txProcess.input", Executors.newFixedThreadPool(3)))
.get();
}
这应该会获得记录列表,而无需尝试更新任何内容。相反,它被发送到执行者通道。
在这里,我在 outboundGateway 上使用
e -> e.transactional(true)
,它在单个文档上启动 findOneAndUpdate
。
@Bean
public IntegrationFlow txProcess(MongoTemplate mongoTemplate) {
return f -> f
.handle(MongoDb.outboundGateway(mongoTemplate).collectionName("work").entityClass(Document.class)
.collectionCallback((c, m) ->
c.findOneAndUpdate(Filters.eq("uuid", ((Document) m.getPayload()).get("uuid")),
Updates.set("status", "DONE"))
), e -> e.transactional(true)
)
.<Document>handle((p, h) -> {
System.out.println("-----PROCESSING-----");
System.out.println(p);
// simulate something that takes time
try {Thread.sleep(10000);} catch (InterruptedException e) {}
return null;
});
}
但是我注意到奇怪的行为,并且不太确定发生了什么或我做错了什么。
如果我插入一两条消息,它似乎会按预期运行。输出显示它正在处理的消息,并且它最终提交状态现在为 DONE 的事务。我还看到每秒来自轮询器的 WriteConflict 错误,这是可以预料的,因为集合中只有这两条记录,并且没有其他记录可以继续处理。输出看起来像这样:
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba6, status=TREADY, message=hello, uuid=345eb720-5a6b-4dda-ab4e-24ac8a31b67a}}
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba7, status=TREADY, message=hello, uuid=230ad5d3-4c21-40ff-b75d-05a227d34fec}}
如果我插入三个或更多记录,轮询 WriteConflict 错误将停止。就好像轮询器不再在某些东西上阻塞一样运行。正在处理的记录的初始输出看起来不错:
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba6, status=TREADY, message=hello, uuid=345eb720-5a6b-4dda-ab4e-24ac8a31b67a}}
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba7, status=TREADY, message=hello, uuid=230ad5d3-4c21-40ff-b75d-05a227d34fec}}
-----PROCESSING-----
Document{{_id=65bc00ff5a057207a14e2ba4, status=TREADY, message=hello, uuid=31988399-7b1e-4f5e-832f-53a6baa7883d}}
然而,这些完成后,奇怪的事情发生了。该流程陷入循环,不断处理已完成的相同消息。状态现在为 DONE 而不是 TREADY 表明了这一点:
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba6, status=DONE, message=hello, uuid=345eb720-5a6b-4dda-ab4e-24ac8a31b67a}}
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba7, status=DONE, message=hello, uuid=230ad5d3-4c21-40ff-b75d-05a227d34fec}}
-----PROCESSING-----
Document{{_id=65bc00ff5a057207a14e2ba4, status=DONE, message=hello, uuid=31988399-7b1e-4f5e-832f-53a6baa7883d}}
这将永远持续下去,直到我终止该应用程序。我想也许交易保持开放状态并且在流程结束时没有提交。或者也许是别的什么。
编辑:它的交易部分可能是转移注意力的部分。即使我删除了
transactional(true)
,它仍然开始在 DONE 消息上循环。我一定是滥用了执行者通道。
EDIT2:我认为发生的情况是,当记录被重新轮询并提交到固定线程池执行器时,它们被插入到阻塞队列中。如果重新池化的文档在队列中保留的时间足够长,足以让原始线程完成并将其提交为 DONE,则在传递到
findOneAndUpdate
时,它不会收到 WriteLock 错误。另外,当队列最终达到容量并开始阻止轮询线程再添加时,这就是为什么我认为轮询似乎暂停了一段时间。不回答只是为了了解其他人的观点。
我想你的解决方案一定喜欢这样:
@Bean
public IntegrationFlow mongoFlowTxnPerDoc(MongoTemplate mongoTemplate, TransactionManager tm) {
return IntegrationFlow
.from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'TREADY'}")
.collectionName("work").entityClass(Document.class)
.update(Update.update("status", "P")),
p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))
.channel(c -> c.executor(SOME_EXECUTOR)
.split()
.channel(c -> c.executor("txProcess.input", Executors.newFixedThreadPool(3)))
.get();
在完成此通道适配器中的当前设置之前,请勿使用
fixedRate()
不轮询新记录。 fixedRate
为已经运行的并行启动新的轮询任务。
当您将 MongoDB 通道适配器的结果放入新的执行器通道时,拆分器将强制提交原始事务。只是因为我们离开了当前线程。
这样刚刚拉取的文档将被更新,以避免它们进入下一个轮询周期。
处理每条消息的其余逻辑都可以。最后,您可能仍然会在
transactional(true)
上看到 MongoDb.outboundGateway()
。