我正在尝试在 SQS 队列的使用者中实现序列化,该队列在多个 NodeJs 实例中运行。也就是说,我想做的是确保事情按照它们进来的顺序完成,一次一件事。我最初的想法是在 documentDb (mongo)(服务主存储库)中使用可能使用锁定文档的某种信号量机制,通过使用更改流机制等待对该文档的更改。每个实例都有自己的标识符,锁文件将包含拥有锁的人的 ID。不确定这是否是实现这一目标的最佳方法,所以我想问,是否有更好的方法来实现这一目标?或者稍微修改一下就能工作吗?我主要担心的是实例可能会挨饿,这会破坏所有这一切的目的,因为如果幸运的话,一个实例可能会连续两次获得锁。还可以放置一个序列计数器,每个实例跟踪它们在序列中的位置,当它们尝试获取锁时,它们可以将它们的序列计数器与锁上的内容进行比较,如果它们在,则将继续等待在后面。尽管这看起来非常脆弱,并且如果一个实例未完成序列步骤而死亡,则可能会拖延一切。有什么想法吗?
代码看起来像这样
const LockSchema = new Schema({
id: String,
owner: String,
createdAt: { type: Date, required: true, default: Date.now },
});
LockSchema.index({ createdAt: 1 }, { expireAfterSeconds: 600 });
const Lock = mongoose.model("queue-lock", LockSchema, "locks");
const filter = [{
$match: {
{ operationType: "delete" }
}
}];
const options = { fullDocument: 'updateLookup' };
let lockStream = Lock.watch(filter, options)
lockStream.on('change', event => {
if (!event.fullDocument) {// deleted
result = await Lock.findOneAndUpdate(
{ id: "migration-lock" },
{ $setOnInsert: { owner: whoAmI } },
{ upsert: true, new: true }
).lean();
if (result.owner === whoAmI) {
// proud owner of lock, do stuff
await Lock.deleteOne({ id: "queue-lock" }); // when done
} else {
// keep waiting
}
}
});
我想做的是确保事情按照它们进来的顺序完成,一次一件事
您可以使用先进先出队列。
发布到 FIFO 队列中同一组的消息按照发布顺序读取。在从队列中删除已请求的消息之前,不会发布来自同一组的更多消息。
我主要担心的是实例可能会挨饿,这会破坏所有这一切的目的,因为如果幸运的话,一个实例可能会连续两次获得锁。
您关心消息的顺序或哪个实例处理什么内容吗?
同一队列中同一组中的消息假定处理它们的方式并不重要。
如果您有本质上不同的实例(具有不同的计算能力、不同的规格等),那么您应该将消息路由到不同的队列中。
如果处理的顺序很重要,您应该为您的消息引入临时依赖关系(仅在处理其始发者之后发布后续处理消息)。