因此,我在JS中使用rhea设计了一个基本的Publisher-Subscriber模型,该模型接受一个API请求以将数据保存到DB中,然后将其发布到队列中。
[从那里,一个订户(在下面添加了代码)将其提取并尝试将其保存在数据库中。现在我的问题是,该数据库实例在开发期间会经历很多更改,并且可能在插入操作期间导致错误。
因此,现在订户尝试推送到该DB并导致错误时,由于数据已出队,数据将丢失。我是JS的新手,所以有没有办法确保消息不会出队,除非我们确定消息已正确保存,而不必在出错时再次发布它?
我的订户的代码:
const Receiver = require("rhea");
const config = {
PORT: 5672,
host: "localhost"
};
let receiveClient;
function connectReceiver() {
const receiverConnection = Receiver.connect(config);
const receiver = receiverConnection.open_receiver("send_message");
receiver.on("connection_open", function () {
console.log("Subscriber connected through AMQP");
});
receiver.on("error", function (err) {
console.log("Error with Subscriber:", err);
});
receiver.on("message", function (element) {
if (element.message.body === 'detach') {
element.receiver.detach();
}
else if (element.message.body === 'close') {
element.receiver.close();
}
else {
//save in DB
}
}
receiveClient = receiver;
return receiveClient;
}
您可以使用这样的代码来明确接受消息或将其释放回发件人:
try {
save_in_db(event.message);
event.delivery.accept();
} catch {
event.delivery.release();
}
请参阅delivery docs了解更多信息。