我正在尝试设置我的第一个RabbitMQ死信交换,这是我通过Web管理界面使用的步骤:
我期待这些步骤应该通过“dead.letter.test”交换记录到“dead.letter.queue”。这不会发生。
我可以手动将消息放入“dead.letter.test”交换中,它显示在“dead.letter.queue”中,所以我知道这很好。
当我查看管理UI时,它显示在队列“test1”上设置了DLX参数。
我哪里错了?
Gentilissimo Signore非常友好地在Twitter上回答我的问题。问题是,如果您的死信交换设置为DIRECT,您必须指定一个死信路由密钥。如果您只是希望所有NACKed消息进入死信桶以供以后调查(就像我一样)那么您的死信交换应设置为FANOUT。
以下是有效的更新步骤:
最后我们将检查它。为此,在'test_exchange'上发布一些参数'expiration'设置为10000.在此之后,当'test_exchange'上发布消息时,它将转到'test_queue',当消息在队列中过期时,它将查找DLX参数(死信交换名称)那条消息找到名称'dead_exchange'然后该消息将达到'dead_exchange'将其传递给'死队列'..如果你仍然有任何问题,如果我错过了解你的问题..写下你的问题我一定会看看它...谢谢..
注意:必须在'test_exchange'上发布消息,因为test_queue和test_exchange绑定没有路由密钥,它会正常工作但如果你在'test_queue'上发布消息,将使用默认的交换和路由密钥。然后在消息队列尝试到期后使用某个默认路由密钥将该死信息传递给dead_exchange,并且消息不会进入该队列。
如果要在死信交换上使用自定义路由密钥,则必须在声明工作队列时设置x-dead-letter-routing-key
(在您的情况下为test1
),否则将使用默认路由密钥。在您的情况下,RabbitMQ代理检测循环并简单地丢弃被拒绝的消息。
你需要的是在x-dead-letter-exchange=dead.letter.test
队列上设置x-dead-letter-routing-key=dead.letter.queue
和test1
参数。
如果您希望所有队列都具有相同的死信交换,则更容易设置一般策略:
sudo rabbitmqctl -p /my/vhost/path set_policy DLX ".*" '{"dead-letter-exchange":"MyExchange.DEAD"}' --apply-to queues
如果不是强制性的,则不需要创建FANOUT交换。
您可以使用已用于其他交换的相同路由密钥创建DIRECT交换。并且也不需要为新交换创建新队列。您可以使用现有队列进行新交换。您只需要将新交换绑定到队列。
这是我的receive.js文件:
var amqp = require("amqplib/callback_api");
var crontab = require('node-crontab');
amqp.connect("amqp://localhost", function (err, conn) {
conn.createChannel(function (err, ch) {
var ex = 'direct_logs';
var ex2 = 'dead-letter-test';
var severity = 'enterprise-1-key';
//assert "direct" exchange
ch.assertExchange(ex, 'direct', { durable: true });
//assert "dead-letter-test" exchange
ch.assertExchange(ex2, 'direct', { durable: true });
//if acknowledgement is nack() then message will be stored in second exchange i.e. ex2="dead-letter-test"
ch.assertQueue('enterprise-11', { exclusive: false, deadLetterExchange: ex2 }, function (err, q) {
var n = 0;
console.log(' [*] Waiting for logs. To exit press CTRL+C');
console.log(q);
//Binding queue with "direct_logs" exchange
ch.bindQueue(q.queue, ex, severity);
//Binding the same queue with "dead-letter-test"
ch.bindQueue(q.queue, ex2, severity);
ch.consume(q.queue, function (msg) {
// consume messages via "dead-letter-exchange" exchange at every second.
if (msg.fields.exchange === ex2) {
crontab.scheduleJob("* * * * * *", function () {
console.log("Received by latest exchange %s", msg.fields.routingKey, msg.content.toString());
});
} else {
console.log("Received %s", msg.fields.routingKey, msg.content.toString());
}
if (n < 1) {
// this will executes first time only. Here I'm sending nack() so message will be stored in "deadLetterExchange"
ch.nack(msg, false, false);
n += 1;
} else {
ch.ack(msg)
n = 0
}
}, { noAck: false });
});
});
});
创建名为“dead.letter.test”的新DIRECT交换
正确
创建新队列“dead.letter.queue”
正确
将“dead.letter.queue”绑定到“dead.letter.test”
正确
创建新队列“test1”,将死信交换设置为“dead.letter.test”
我假设您正在创建test1队列并将其绑定到dead.letter.test exchange
发送消息到“test1”
如果你希望你的消息被dead.letter.queue收到,你必须在发送消息时提供路由密钥,而使用dead.letter.queue的客户端也应该使用相同的路由密钥
如果您在没有路由密钥的情况下发布,则只有订阅测试1的客户端才会收到该消息。
如果您将消息发布到direct.letter.test exchange,则所有队列都将收到该消息。它会像扇出交换一样工作
因此,如果您希望dead.letter.queue接收消息,您将不得不在该队列中发布消息,或者您必须在发布和订阅时使用相同的路由密钥并将消息发布到交换