对相同的请求负载进行顺序处理,对不同的负载进行并行处理

问题描述 投票:0回答:1

我有一个用 springboot 应用程序编写的 Kafka 消费者,它使用名为 matchupdates 的 Kafka 主题的有效负载,处理它并将其保存到数据库。由于我收到了有关比赛的大量更新,因此可以使用 matchId 来识别相应的有效负载。
考虑以下是 springboot 服务消耗的有效负载,其中 M1,M2 只是 matchId,相应的更新表示字段。

M1 ->(更新1)

{matchid,name,....}
M1 ->(更新2)
{matchid,name,....}
M1 ->(更新3)
{matchid,name,....}
M2 ->(更新1)
{matchid,name,....}
M2 ->(更新2)
{matchid,name,....}

这里我的要求是,**相同的有效负载应该仅顺序消耗**(即matchId M1的update1,update2,update3应该顺序处理(即处理,并更新数据库,以及**如果任何操作花费更长的时间)执行,然后其余的更新应该在队列中等待**),而M1的update1和M2的update2可以并行处理

我可以使用服务执行器或 Java 中的任何其他方式来实现此目的,通过这些方式我可以仅顺序处理相同的有效负载,并且如果需要,它们需要在队列中等待,直到不处理前面的更新。

我尝试将线程池执行器与队列一起使用,但每次它都会分配给池中可用的不同线程,这会导致有效负载与相同的 MatchId 并行执行。

java spring-boot multithreading
1个回答
0
投票

在 Java 中,我们有 BlockingQueue https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html 可以帮助你。创建与 M1、M2... 数量一样多的阻塞队列。假设我们知道。如果没有,我们应该考虑如何重用我们的队列。当来自主题的消息被消费时,读取其 matchid 并将消息分派到相应的队列。调度程序在一个线程中执行并按顺序处理消息。它工作得很快,因为从队列中放入消息和读取消息不需要太多时间。主要时间花在处理消息和数据库交互上。因此,我们创建的线程数量与我们拥有的队列数量相同。您可以使用固定线程池或创建线程作为新线程,重要的是每个线程必须为其自己的队列提供服务。通过这种方式,它将按顺序处理具有相同 matchid 的每条消息,但写入数据库将并行

© www.soinside.com 2019 - 2024. All rights reserved.