我在总线服务中有两个主题,每个主题有 2 个订阅,并且来自同一个项目,我想使用所有四个主题。我尝试单独使用这些主题,效果很好,我使用了该主题的两个订阅,一个和另一个,问题是同时执行这两个操作。它给了我这个错误:“完全限定的命名空间”不能为空。但即使我把它放在正确的命名空间中,它也会将其覆盖为 null
cloud: azure: servicebus: topic1: bindings: consumeSuscripcionUno-in-0: group: topic1 consumeSuscripcionDos-in-0: group: topic1 connection-string: "connection-string" enabled: true topic2: bindings: consumeSuscripcionUno-in-0: group: topic2 connection-string: "connection-string" enabled: true
这是配置的一部分
我尝试了下面的Java代码来接收来自两个主题的消息,每个主题有两个订阅。
代码:
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
public class TopicConsumer {
public static void main(String[] args) {
String connection1 = "<topic1_connec_String>";
String topic1 = "topic1";
String subscription1 = "<topic1_subsrciptionName>";
String connection2 = "<topic2_connec_String>";
String topic2 = "topic2";
String subscription2 = "<topic2_subsrciptionName>";
String connection3 = "<topic1_connec_String>";
String subscription3 = "<topic1_subsrciptionName>";
String connection4 = "<topic2_connec_String>";
String subscription4 = "<topic2_subsrciptionName>";
ServiceBusProcessorClient processor1 = createProcessor(connection1, topic1, subscription1);
ServiceBusProcessorClient processor2 = createProcessor(connection2, topic2, subscription2);
ServiceBusProcessorClient processor3 = createProcessor(connection3, topic1, subscription3);
ServiceBusProcessorClient processor4 = createProcessor(connection4, topic2, subscription4);
startProcessor(processor1, "Topic 1, Subscription 1");
startProcessor(processor2, "Topic 2, Subscription 2");
startProcessor(processor3, "Topic 1, Subscription 3");
startProcessor(processor4, "Topic 2, Subscription 4");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
stopProcessor(processor1, "Topic 1, Subscription 1");
stopProcessor(processor2, "Topic 2, Subscription 2");
stopProcessor(processor3, "Topic 1, Subscription 3");
stopProcessor(processor4, "Topic 2, Subscription 4");
}));
}
private static ServiceBusProcessorClient createProcessor(String connection, String topic, String subscription) {
return new ServiceBusClientBuilder()
.connectionString(connection)
.processor()
.topicName(topic)
.subscriptionName(subscription)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.processMessage(context -> {
System.out.printf("Received message from %s, %s: %s%n", topic, subscription, context.getMessage().getBody().toString());
context.complete();
})
.processError(context -> {
System.err.printf("Error occurred: %s%n", context.getException());
})
.buildProcessorClient();
}
private static void startProcessor(ServiceBusProcessorClient processor, String processorName) {
System.out.println("Starting the processor for " + processorName + "...");
processor.start();
}
private static void stopProcessor(ServiceBusProcessorClient processor, String processorName) {
System.out.println("Stopping the processor for " + processorName + "...");
processor.stop();
}
}
pom.xml:
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.0.0</version>
</dependency>
</dependencies>
我创建了两个主题,每个主题有两个订阅并向其发送消息,如下在 Azure 服务总线中所示。
主题1:
主题2:
输出:
它运行成功,并已收到来自两个主题的四个订阅的每条消息,如下所示。