使用 Node.js 创建服务总线主题消息时添加自定义消息属性

问题描述 投票:0回答:1

我有一个具有多个订阅的主题,需要为每个订阅添加一个过滤器以触发相应的功能应用程序。

这是我用于创建消息的 Node.js 代码:

const router = express.Router();
const { jsonParser, ServiceBusClient } = require('../server');

const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING;
const topicName = process.env.SERVICE_BUS_TOPIC_NAME;


router.post('/post-message', jsonParser, async (req, res) => {
    try {
        // Extract message data from the request
        const messageData = req.body;
        // Create a Service Bus client using the connection string
        const serviceBusClient = new ServiceBusClient(connectionString);

        // Create a sender for the topic
        const sender = serviceBusClient.createSender(topicName);

        const message = {
            applicationProperties: {
                eventType: "jobCosts"
            },
            subject: 'jobCosts',
            body: messageData,
        };

        console.log(message);

        // Send the message to the topic
        await sender.sendMessages(message);

        // Close the sender and the Service Bus client
        await sender.close();
        await serviceBusClient.close();

        res.send(message);

    } catch (error) {
        console.error("Error sending message to Service Bus topic:", error);
        // Handle the error, e.g., by sending an error response
        res.status(500).json({ error: "An error occurred while sending the message to the topic" });
    }
})

module.exports = router;

我有一个为名为 eventType =“jobCosts”的自定义属性配置的相关过滤器。

当我使用 Service Bus Explorer 从 Azure 门户发送消息时,消息会被正确处理,但是当我执行上面的代码时,消息会被发布,但从未触及主题。

还尝试通过传递“主题”来过滤标签/主题系统属性,但也无法使其工作。有人知道我错在哪里吗?

node.js azure azureservicebus azure-servicebus-topics
1个回答
0
投票

还尝试通过传递“主题”来过滤标签/主题系统属性,但也无法使其工作。有人知道我错在哪里吗?

我已经尝试了上面的代码,并且我的消息通过代码发送时遇到了同样的问题,但这些消息达到了死信,这是不满意的。

  • 我已为我的订阅启用会话 ID,如下所示。

enter image description here

代码:

const { ServiceBusClient } = require("@azure/service-bus");
const express = require('express');
const app = express();
const PORT = process.env.PORT || 3000;

app.get('/send-message', async (req, res) => {
    let serviceBusClient;
    let sender;
    
    try {
        const connectionString = "Endpoint=sb://your-servicebusnamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=svbfe";
        const topicName = "give-ur-topic-name";

        serviceBusClient = new ServiceBusClient(connectionString);
        sender = serviceBusClient.createSender(topicName);

        // Create a message
        const messageBody = {
            id: 1,
            eventType: "jobCosts",
            // Add other message properties if needed
        };

        const message = {
            body: JSON.stringify(messageBody),
            // Add other message properties if needed
            // For correlation filter, you can set the properties here
            correlationId: "12345", // Optional, you can set this to any unique identifier
            sessionId: "107", // Set the session ID here
        };

        console.log("Sending message:", messageBody);

        // Send the message
        await sender.sendMessages(message);
        console.log("Message sent successfully");

        res.send("Message sent successfully");
    } catch (error) {
        console.error("Error occurred:", error);
        res.status(500).send("Error occurred while sending message");
    } finally {
        try {
            if (sender) {
                await sender.close();
            }
            if (serviceBusClient) {
                await serviceBusClient.close();
            }
        } catch (error) {
            console.error("Error while closing sender or service bus client:", error);
        }
    }
});

app.listen(PORT, () => {
    console.log(`Server is running on port ${PORT}`);
});

enter image description here

  • 下面是我的两个订阅,一个启用了会话,另一个禁用了,您可以看到启用的订阅的活动消息数为 14。

enter image description here

收到消息:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.