我有一个 nodejs 应用程序,它使用
amqp
从远程服务器监听 RabbitMQ 队列,并使用队列中的作业。
// Connect to the rabbitMQ server and create a channel
const connection = await amqplib.connect(RabbitMQ_URL);
const channel = await connection.createChannel();
channel.prefetch(1);
// Assert the queue
await channel.assertQueue(JOB_QUEUE_NAME, { durable: true });
// start the ShutDownCounter;
shutDownCounter.start();
// Consume the queue
channel.consume(
JOB_QUEUE_NAME,
async message => {
const task: Task = JSON.parse(message.content.toString());
// Process the task
try {
taskProgress = await processTask(task);
} catch (error) {
console.log(error);
}
// Acknowledge the message
channel.ack(message);
}
);
在其他地方,在 120 秒没有从远程队列中消费任何新消息后,我调用
channel.close()
函数,然后关闭应用程序;
这种情况会发生吗:
在时间
t = 120.001
,调用channel.close()
调用;但频道还活着;
时间
t = 120.003
,一条新消息(任务)刚刚到达! channel.consume()
接受消息。 (当然这样的事情是注定要发生的);
时间
t = 120.010
,channel.close()
调用完成,通道实际关闭。
在时间
t = 120.100
,应用程序被关闭,即使它实际上正在处理任务。
如何避免这种情况?
在这里回答自己...
我发现根本不需要担心。它有据可查 RabbitMQ 自动重新排队
当消费者失败或失去连接时:自动重新排队
当使用手动确认时,任何交付(消息)被 not acked 在通道(或连接)时自动重新排队 发生交货的地方已关闭。这包括 TCP 连接 客户损失、消费者应用程序(进程)故障,以及 通道级协议异常(见下文)。
...
由于这种行为,消费者必须准备好处理 重新交付和以其他方式实现幂等性。 重新交付将有一个特殊的布尔属性,重新交付,设置为 RabbitMQ 为真。对于首次交付,它将设置为 false。 请注意,消费者可以收到一条消息,该消息之前 交付给另一个消费者。
我只需要确保在我的应用程序中需要手动确认,然后即使发生这种边缘情况,消息也不会丢失。