IntegrationFlow 可分割集合并为每个消息创建一个事务

问题描述 投票:0回答:1

我之前创建了一个集成流程,该流程使用带有 updateExpression 的 MongoDb inboundChannelAdapter 并将其设置为事务性:

IntegrationFlow
                .from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}")
                    .collectionName("work").entityClass(Document.class)
                        .update(Update.update("status", "P")), 
                        p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))

这个轮询器将获取文档列表,对它们进行写锁定,对它们执行

split()
。然后开始加工。如果一切顺利,事务将在流程结束时提交,其中 outboundGateway 将记录设置为“完成”。

事务和 updateExpression() 的目的是防止其他轮询线程获取相同的记录并处理它们(如果它们已经被处理)。这是可行的,但如果该列表中的任何单个文档已经具有写锁,则整个轮询将失败,因为它是整个列表上的一个事务。

我的代码的下一次迭代是尝试在每个单独文档的 split() 之后获取事务。这样,如果仅对一个文档的写锁定尝试失败,它就可以转到列表中的下一个文档并尝试对该文档进行写锁定。更进一步,每个单独的文档可以在单独的线程上并行处理,每个线程都有自己的事务。我的尝试看起来像这样:

    @Bean
    public IntegrationFlow mongoFlowTxnPerDoc(MongoTemplate mongoTemplate, TransactionManager tm) {
        return IntegrationFlow
                .from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'TREADY'}")
                    .collectionName("work").entityClass(Document.class), 
                        p -> p.poller(pm -> pm.fixedRate(1000L)))
                .split()
                .channel(c -> c.executor("txProcess.input", Executors.newFixedThreadPool(3)))
                .get();
}

这应该会获得记录列表,而无需尝试更新任何内容。相反,它被发送到执行者通道。

在这里,我在 outboundGateway 上使用

e -> e.transactional(true)
,它在单个文档上启动
findOneAndUpdate

    @Bean
    public IntegrationFlow txProcess(MongoTemplate mongoTemplate) {
        return f -> f
            .handle(MongoDb.outboundGateway(mongoTemplate).collectionName("work").entityClass(Document.class)
                .collectionCallback((c, m) -> 
                    c.findOneAndUpdate(Filters.eq("uuid", ((Document) m.getPayload()).get("uuid")),
                            Updates.set("status", "DONE"))
                ), e -> e.transactional(true)
            )
            .<Document>handle((p, h) -> {
                System.out.println("-----PROCESSING-----");
                System.out.println(p);
                // simulate something that takes time
                try {Thread.sleep(10000);} catch (InterruptedException e) {}
                return null;
            });
    }

但是我注意到奇怪的行为,并且不太确定发生了什么或我做错了什么。

如果我插入一两条消息,它似乎会按预期运行。输出显示它正在处理的消息,并且它最终提交状态现在为 DONE 的事务。我还看到每秒来自轮询器的 WriteConflict 错误,这是可以预料的,因为集合中只有这两条记录,并且没有其他记录可以继续处理。输出看起来像这样:

-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba6, status=TREADY, message=hello, uuid=345eb720-5a6b-4dda-ab4e-24ac8a31b67a}}
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba7, status=TREADY, message=hello, uuid=230ad5d3-4c21-40ff-b75d-05a227d34fec}}

如果我插入三个或更多记录,轮询 WriteConflict 错误将停止。就好像轮询器不再在某些东西上阻塞一样运行。正在处理的记录的初始输出看起来不错:

-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba6, status=TREADY, message=hello, uuid=345eb720-5a6b-4dda-ab4e-24ac8a31b67a}}
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba7, status=TREADY, message=hello, uuid=230ad5d3-4c21-40ff-b75d-05a227d34fec}}
-----PROCESSING-----
Document{{_id=65bc00ff5a057207a14e2ba4, status=TREADY, message=hello, uuid=31988399-7b1e-4f5e-832f-53a6baa7883d}}

然而,这些完成后,奇怪的事情发生了。该流程陷入循环,不断处理已完成的相同消息。状态现在为 DONE 而不是 TREADY 表明了这一点:

-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba6, status=DONE, message=hello, uuid=345eb720-5a6b-4dda-ab4e-24ac8a31b67a}}
-----PROCESSING-----
Document{{_id=65bc01015a057207a14e2ba7, status=DONE, message=hello, uuid=230ad5d3-4c21-40ff-b75d-05a227d34fec}}
-----PROCESSING-----
Document{{_id=65bc00ff5a057207a14e2ba4, status=DONE, message=hello, uuid=31988399-7b1e-4f5e-832f-53a6baa7883d}}

这将永远持续下去,直到我终止该应用程序。我想也许交易保持开放状态并且在流程结束时没有提交。或者也许是别的什么。

编辑:它的交易部分可能是转移注意力的部分。即使我删除了

transactional(true)
,它仍然开始在 DONE 消息上循环。我一定是滥用了执行者通道。

EDIT2:我认为发生的情况是,当记录被重新轮询并提交到固定线程池执行器时,它们被插入到阻塞队列中。如果重新池化的文档在队列中保留的时间足够长,足以让原始线程完成并将其提交为 DONE,则在传递到

findOneAndUpdate
时,它不会收到 WriteLock 错误。另外,当队列最终达到容量并开始阻止轮询线程再添加时,这就是为什么我认为轮询似乎暂停了一段时间。不回答只是为了了解其他人的观点。

mongodb spring-boot spring-integration spring-integration-dsl
1个回答
0
投票

我想你的解决方案一定喜欢这样:

@Bean
public IntegrationFlow mongoFlowTxnPerDoc(MongoTemplate mongoTemplate, TransactionManager tm) {
    return IntegrationFlow
            .from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'TREADY'}")
                .collectionName("work").entityClass(Document.class)
                .update(Update.update("status", "P")), 
                    p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))
            .channel(c -> c.executor(SOME_EXECUTOR)
            .split()
            .channel(c -> c.executor("txProcess.input", Executors.newFixedThreadPool(3)))
            .get();
  1. 在完成此通道适配器中的当前设置之前,请勿使用

    fixedRate()
    不轮询新记录。
    fixedRate
    为已经运行的并行启动新的轮询任务。

  2. 当您将 MongoDB 通道适配器的结果放入新的执行器通道时,拆分器将强制提交原始事务。只是因为我们离开了当前线程。

  3. 这样刚刚拉取的文档将被更新,以避免它们进入下一个轮询周期。

  4. 处理每条消息的其余逻辑都可以。最后,您可能仍然会在

    transactional(true)
    上看到
    MongoDb.outboundGateway()

© www.soinside.com 2019 - 2024. All rights reserved.