我有一个具有多个订阅的主题,需要为每个订阅添加一个过滤器以触发相应的功能应用程序。
这是我用于创建消息的 Node.js 代码:
const router = express.Router();
const { jsonParser, ServiceBusClient } = require('../server');
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING;
const topicName = process.env.SERVICE_BUS_TOPIC_NAME;
router.post('/post-message', jsonParser, async (req, res) => {
try {
// Extract message data from the request
const messageData = req.body;
// Create a Service Bus client using the connection string
const serviceBusClient = new ServiceBusClient(connectionString);
// Create a sender for the topic
const sender = serviceBusClient.createSender(topicName);
const message = {
applicationProperties: {
eventType: "jobCosts"
},
subject: 'jobCosts',
body: messageData,
};
console.log(message);
// Send the message to the topic
await sender.sendMessages(message);
// Close the sender and the Service Bus client
await sender.close();
await serviceBusClient.close();
res.send(message);
} catch (error) {
console.error("Error sending message to Service Bus topic:", error);
// Handle the error, e.g., by sending an error response
res.status(500).json({ error: "An error occurred while sending the message to the topic" });
}
})
module.exports = router;
我有一个为名为 eventType =“jobCosts”的自定义属性配置的相关过滤器。
当我使用 Service Bus Explorer 从 Azure 门户发送消息时,消息会被正确处理,但是当我执行上面的代码时,消息会被发布,但从未触及主题。
还尝试通过传递“主题”来过滤标签/主题系统属性,但也无法使其工作。有人知道我错在哪里吗?
还尝试通过传递“主题”来过滤标签/主题系统属性,但也无法使其工作。有人知道我错在哪里吗?
我已经尝试了上面的代码,并且我的消息通过代码发送时遇到了同样的问题,但这些消息达到了死信,这是不满意的。
代码:
const { ServiceBusClient } = require("@azure/service-bus");
const express = require('express');
const app = express();
const PORT = process.env.PORT || 3000;
app.get('/send-message', async (req, res) => {
let serviceBusClient;
let sender;
try {
const connectionString = "Endpoint=sb://your-servicebusnamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=svbfe";
const topicName = "give-ur-topic-name";
serviceBusClient = new ServiceBusClient(connectionString);
sender = serviceBusClient.createSender(topicName);
// Create a message
const messageBody = {
id: 1,
eventType: "jobCosts",
// Add other message properties if needed
};
const message = {
body: JSON.stringify(messageBody),
// Add other message properties if needed
// For correlation filter, you can set the properties here
correlationId: "12345", // Optional, you can set this to any unique identifier
sessionId: "107", // Set the session ID here
};
console.log("Sending message:", messageBody);
// Send the message
await sender.sendMessages(message);
console.log("Message sent successfully");
res.send("Message sent successfully");
} catch (error) {
console.error("Error occurred:", error);
res.status(500).send("Error occurred while sending message");
} finally {
try {
if (sender) {
await sender.close();
}
if (serviceBusClient) {
await serviceBusClient.close();
}
} catch (error) {
console.error("Error while closing sender or service bus client:", error);
}
}
});
app.listen(PORT, () => {
console.log(`Server is running on port ${PORT}`);
});
收到消息: