我想消费两个公交服务主题

问题描述 投票:0回答:1

我在总线服务中有两个主题,每个主题有 2 个订阅,并且来自同一个项目,我想使用所有四个主题。我尝试单独使用这些主题,效果很好,我使用了该主题的两个订阅,一个和另一个,问题是同时执行这两个操作。它给了我这个错误:“完全限定的命名空间”不能为空。但即使我把它放在正确的命名空间中,它也会将其覆盖为 null

cloud: azure: servicebus: topic1: bindings: consumeSuscripcionUno-in-0: group: topic1 consumeSuscripcionDos-in-0: group: topic1 connection-string: "connection-string" enabled: true topic2: bindings: consumeSuscripcionUno-in-0: group: topic2 connection-string: "connection-string" enabled: true

这是配置的一部分

java azure azureservicebus servicebus
1个回答
0
投票

我尝试了下面的Java代码来接收来自两个主题的消息,每个主题有两个订阅。

代码

import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;

public class TopicConsumer {
    public static void main(String[] args) {

        String connection1 = "<topic1_connec_String>";
        String topic1 = "topic1";
        String subscription1 = "<topic1_subsrciptionName>";

        String connection2 = "<topic2_connec_String>";
        String topic2 = "topic2";
        String subscription2 = "<topic2_subsrciptionName>";

        String connection3 = "<topic1_connec_String>";
        String subscription3 = "<topic1_subsrciptionName>";

        String connection4 = "<topic2_connec_String>";
        String subscription4 = "<topic2_subsrciptionName>";

        ServiceBusProcessorClient processor1 = createProcessor(connection1, topic1, subscription1);
        ServiceBusProcessorClient processor2 = createProcessor(connection2, topic2, subscription2);

        ServiceBusProcessorClient processor3 = createProcessor(connection3, topic1, subscription3);
        ServiceBusProcessorClient processor4 = createProcessor(connection4, topic2, subscription4);

        startProcessor(processor1, "Topic 1, Subscription 1");
        startProcessor(processor2, "Topic 2, Subscription 2");
        startProcessor(processor3, "Topic 1, Subscription 3");
        startProcessor(processor4, "Topic 2, Subscription 4");

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            stopProcessor(processor1, "Topic 1, Subscription 1");
            stopProcessor(processor2, "Topic 2, Subscription 2");
            stopProcessor(processor3, "Topic 1, Subscription 3");
            stopProcessor(processor4, "Topic 2, Subscription 4");
        }));
    }

    private static ServiceBusProcessorClient createProcessor(String connection, String topic, String subscription) {
        return new ServiceBusClientBuilder()
                .connectionString(connection)
                .processor()
                .topicName(topic)
                .subscriptionName(subscription)
                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                .processMessage(context -> {
                    System.out.printf("Received message from %s, %s: %s%n", topic, subscription, context.getMessage().getBody().toString());
                    context.complete();
                })
                .processError(context -> {
                    System.err.printf("Error occurred: %s%n", context.getException());
                })
                .buildProcessorClient();
    }

    private static void startProcessor(ServiceBusProcessorClient processor, String processorName) {
        System.out.println("Starting the processor for " + processorName + "...");
        processor.start();
    }

    private static void stopProcessor(ServiceBusProcessorClient processor, String processorName) {
        System.out.println("Stopping the processor for " + processorName + "...");
        processor.stop();
    }
}

pom.xml

<dependencies>
     <dependency>
          <groupId>com.azure</groupId>
          <artifactId>azure-messaging-servicebus</artifactId>
          <version>7.0.0</version>
     </dependency>
</dependencies>

我创建了两个主题,每个主题有两个订阅并向其发送消息,如下在 Azure 服务总线中所示。

主题1:

enter image description here

主题2:

enter image description here

输出

它运行成功,并已收到来自两个主题的四个订阅的每条消息,如下所示。

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.