Rabbitmq集群,当节点离线时,如何保证离线队列收到来自扇出交换机的消息?

问题描述 投票:0回答:1

我有三个节点集群,两个队列绑定到一个扇出交换机。我的要求是所有发送到exchange的消息都必须保存到这两个队列中,所有消息不能丢失并且必须得到处理。

当其中一个节点离线时,该节点上的队列将丢失交换机收到的消息。我也许可以使用仲裁队列,但这只能允许一个节点离线。如果两个节点离线,也会出现同样的问题。有什么解决办法吗?

rabbitmq amqp
1个回答
0
投票

您可以只使用一个直接队列。比你的 3 个节点将一条一条地处理消息,这样会更快。您还可以使用这些标头为有错误的消息创建一个接收队列:

async function initBaseQueues(
  ch: amqplib.Channel,
  hash: Record<string, string>,
) {
  const queues = Object.keys(hash);

  await ch.assertExchange(FACTORY_GATEWAY_EXCHANGE, 'direct');

  for (let i = 0; i < queues.length; i++) {
    const queue = queues[i];

    const routingKey = hash[queue];

    await ch.assertQueue(queue, {
      autoDelete: false,
      durable: true,
      arguments: {
        [ERabbitQueueArguments.deadLetterExchange]:
          FACTORY_GATEWAY_EXCHANGE + RETRY_CONST,
        [ERabbitQueueArguments.deadLetterRoutingKey]: routingKey + RETRY_CONST,
      },
    });

    await ch.bindQueue(queue, FACTORY_GATEWAY_EXCHANGE, routingKey);

    Logger.log(`Queue ${queue} initialized`);
  }
}

async function initRetryQueues(
  ch: amqplib.Channel,
  hash: Record<string, string>,
) {
  const queues = Object.keys(hash);

  await ch.assertExchange(FACTORY_GATEWAY_EXCHANGE + RETRY_CONST, 'direct');

  for (let i = 0; i < queues.length; i++) {
    const queue = queues[i];

    const routingKey = hash[queue];

    await ch.assertQueue(queue + RETRY_CONST, {
      autoDelete: false,
      durable: true,
      arguments: {
        [ERabbitQueueArguments.deadLetterExchange]: FACTORY_GATEWAY_EXCHANGE,
        [ERabbitQueueArguments.deadLetterRoutingKey]: routingKey,
        [ERabbitQueueArguments.messageTTL]: REQUEUE_DEALAY_CONST,
      },
    });

    await ch.bindQueue(
      queue + RETRY_CONST,
      FACTORY_GATEWAY_EXCHANGE + RETRY_CONST,
      routingKey + RETRY_CONST,
    );

    Logger.log(`Queue ${queue + RETRY_CONST} initialized`);
  }
}

export default async function initRabbitQueues() {
  const conn = await amqplib.connect(
    process.env.RABBIT_HOST ?? 'amqp://localhost:5672',
  );

  const ch1 = await conn.createChannel();

  const hash = queueNames();

  await initBaseQueues(ch1, hash);

  await initRetryQueues(ch1, hash);

  await conn.close();
}

标题所在:

export enum ERabbitQueueArguments {
  deadLetterExchange = 'x-dead-letter-exchange',
  deadLetterRoutingKey = 'x-dead-letter-routing-key',
  messageTTL = 'x-message-ttl',
}
© www.soinside.com 2019 - 2024. All rights reserved.