在节点应用程序中序列化SQS fifo队列的消费者

问题描述 投票:0回答:1

我正在尝试在 SQS 队列的使用者中实现序列化,该队列在多个 NodeJs 实例中运行。也就是说,我想做的是确保事情按照它们进来的顺序完成,一次一件事。我最初的想法是在 documentDb (mongo)(服务主存储库)中使用可能使用锁定文档的某种信号量机制,通过使用更改流机制等待对该文档的更改。每个实例都有自己的标识符,锁文件将包含拥有锁的人的 ID。不确定这是否是实现这一目标的最佳方法,所以我想问,是否有更好的方法来实现这一目标?或者稍微修改一下就能工作吗?我主要担心的是实例可能会挨饿,这会破坏所有这一切的目的,因为如果幸运的话,一个实例可能会连续两次获得锁。还可以放置一个序列计数器,每个实例跟踪它们在序列中的位置,当它们尝试获取锁时,它们可以将它们的序列计数器与锁上的内容进行比较,如果它们在,则将继续等待在后面。尽管这看起来非常脆弱,并且如果一个实例未完成序列步骤而死亡,则可能会拖延一切。有什么想法吗?

代码看起来像这样

const LockSchema = new Schema({
  id: String,
  owner: String,
  createdAt: { type: Date, required: true, default: Date.now },
});

LockSchema.index({ createdAt: 1 }, { expireAfterSeconds: 600 });

const Lock = mongoose.model("queue-lock", LockSchema, "locks");


const filter = [{
    $match: {
            { operationType: "delete" }
        }
    }];

const options = { fullDocument: 'updateLookup' };

let lockStream = Lock.watch(filter, options)

lockStream.on('change', event => {
  if (!event.fullDocument) {// deleted
    result = await Lock.findOneAndUpdate(
      { id: "migration-lock" },
      { $setOnInsert: { owner: whoAmI } },
      { upsert: true, new: true }
    ).lean();
    if (result.owner === whoAmI) {
      // proud owner of lock, do stuff
      await Lock.deleteOne({ id: "queue-lock" }); // when done

    } else {
      // keep waiting
    }
  }

});
node.js mongodb amazon-sqs
1个回答
0
投票

我想做的是确保事情按照它们进来的顺序完成,一次一件事

您可以使用先进先出队列

发布到 FIFO 队列中同一组的消息按照发布顺序读取。在从队列中删除已请求的消息之前,不会发布来自同一组的更多消息。

我主要担心的是实例可能会挨饿,这会破坏所有这一切的目的,因为如果幸运的话,一个实例可能会连续两次获得锁。

您关心消息的顺序或哪个实例处理什么内容吗?

同一队列中同一组中的消息假定处理它们的方式并不重要。

如果您有本质上不同的实例(具有不同的计算能力、不同的规格等),那么您应该将消息路由到不同的队列中。

如果处理的顺序很重要,您应该为您的消息引入临时依赖关系(仅在处理其始发者之后发布后续处理消息)。

© www.soinside.com 2019 - 2024. All rights reserved.