NodeJS amqp:是否可以在通道关闭操作完成之前调用 channel.close() 方法后立即使用消息?

问题描述 投票:0回答:1

我有一个 nodejs 应用程序,它使用

amqp
从远程服务器监听 RabbitMQ 队列,并使用队列中的作业。

    // Connect to the rabbitMQ server and create a channel
    const connection = await amqplib.connect(RabbitMQ_URL);
    const channel = await connection.createChannel();
    channel.prefetch(1);

    // Assert the queue
    await channel.assertQueue(JOB_QUEUE_NAME, { durable: true });

    // start the ShutDownCounter;
    shutDownCounter.start();

    // Consume the queue
    channel.consume(
      JOB_QUEUE_NAME,
      async message => {

        const task: Task = JSON.parse(message.content.toString());

        // Process the task
        try {
          taskProgress = await processTask(task);
        } catch (error) {
          console.log(error);
        }

        // Acknowledge the message
        channel.ack(message);
      }
    );

在其他地方,在 120 秒没有从远程队列中消费任何新消息后,我调用

channel.close()
函数,然后关闭应用程序;

这种情况会发生吗:

  1. 在时间

    t = 120.001
    ,调用
    channel.close()
    调用;但频道还活着;

  2. 时间

    t = 120.003
    ,一条新消息(任务)刚刚到达!
    channel.consume()
    接受消息。 (当然这样的事情是注定要发生的);

  3. 时间

    t = 120.010
    channel.close()
    调用完成,通道实际关闭。

  4. 在时间

    t = 120.100
    ,应用程序被关闭,即使它实际上正在处理任务。

如何避免这种情况?

node.js multithreading asynchronous rabbitmq amqp
1个回答
0
投票

在这里回答自己...

我发现根本不需要担心。它有据可查 RabbitMQ 自动重新排队

当消费者失败或失去连接时:自动重新排队

当使用手动确认时,任何交付(消息)被 not acked 在通道(或连接)时自动重新排队 发生交货的地方已关闭。这包括 TCP 连接 客户损失、消费者应用程序(进程)故障,以及 通道级协议异常(见下文)。

...

由于这种行为,消费者必须准备好处理 重新交付和以其他方式实现幂等性。 重新交付将有一个特殊的布尔属性,重新交付,设置为 RabbitMQ 为真。对于首次交付,它将设置为 false。 请注意,消费者可以收到一条消息,该消息之前 交付给另一个消费者。

我只需要确保在我的应用程序中需要手动确认,然后即使发生这种边缘情况,消息也不会丢失。

© www.soinside.com 2019 - 2024. All rights reserved.