我的应用程序队列(RabbitMQ)中有一个作业列表。
其中一些作业是分组的,并且必须按照顺序来做(不是连续的,而是按照派遣时间的顺序)。
例如,考虑到队列中有4个作业,我必须确保 "x "组的第一个作业在第3个作业(同组)之前成功执行。
[
{ "group": "x", "dispatched_timestamp": 10001, "field1": "some data", "field2": "some other data"},
{ "group": "g", "dispatched_timestamp": 10005,"field1": "some data", "field2": "some other data"},
{ "group": "x", "dispatched_timestamp": 10005,"field1": "some data", "field2": "some other data"},
{ "group": "t", "dispatched_timestamp": 10005,"field1": "some data", "field2": "some other data"}
]
我必须确保 "x "组的第一个作业在第三个作业(同一组)之前成功执行,但我不在乎第四个作业是否比第一个作业早执行(或其他什么),因为有时会发生三个作业都交付给3个消费者,但第一个作业由于某种原因失败了(但第二个和第三个作业成功了)。
因为有时可能会发生三个作业都交付给3个消费者,但第一个作业由于某种原因失败了(但第二个和第三个作业已经成功了)。
我知道在这种情况下,会有一些情况,即队列中的所有作业都属于同一组,所以多个消费者不能对它们进行操作,必须一个一个地交付。
在AMQ协议中,没有这样的东西可以导致这种精确的解决方案,有一些方法可以解决这个问题。
让我引用文档中的信息排序
AMQP 0-9-1核心规范的4.7节解释了保证排序的条件:在一个通道中发布的消息,通过一个交换和一个队列以及一个出站通道,将按照发送的顺序接收。RabbitMQ自2.7.0版本以来提供了更强的保证。
参考文献 https:/www.rabbitmq.comsemantics.html
首先对你来说最重要的是保存消息的排序,一旦我们对消息进行了排序,我们就可以利用并发来按顺序处理消息。
假设你的队列有5条消息,如图所示。
Queue: Queue1
+--------------+
Head-->|m1|m2|m3|m4|m5| <---- Tail
+--------------+
有竞争消费者的概念,竞争消费者是指同一个队列有多个消费者订阅者。如果有多个消费者比每个消费者都会自主运行,这意味着消费者端的排序不会被保留。为了保留消费者侧的排序,我们不应该使用竞争消费者。
即使现在消费者不竞争,但如果我们有一个以上的执行者,我们仍然会失去消息排序。多于一个执行者简单的说就是我们可以轮询队列,向任何一个执行者发送轮询的消息。基于CPU的执行策略等,我们仍然会失去排序,所以现在我们需要将执行器的数量限制为1。
由于我们只有一个执行者,每个轮询的消息都会按顺序执行,所以会变成一个串行执行。
对于队列1
执行者将按以下顺序消耗该消息
-> m1
-> m2
-> m3
-> m4
-> m5
不过,还差一个环节,如果执行的是 m1
是失败的?你可以重试 N
消耗下一条消息之前的次数,为了实现这一点,除非你成功地执行了任何轮询的消息,否则不要承认。
从设计的角度来看,这看起来并不好,因为你是以串行而不是并行的方式处理消息,尽管你没有任何其他选择。