如何为 azure-service-bus 启用基于会话的 JMS 侦听器?

问题描述 投票:0回答:1

我正在尝试将 azure-service-bus 集成到我的 java spring-project 中。

我的 azure-service-bus 订阅已启用

sessions

我正在使用

org.springframework.jms.annotation.JmsListener
设置监听器。

问题

  • 无法创建侦听启用会话的订阅的侦听器。
  • 也无法将消息推送到主题,它们将进入死信队列。 (原因:DeadLetterReason -> 会话 id 为 null。,DeadLetterErrorDescription -> 启用会话的实体不允许会话标识符为 null 的消息。)

问题

  • 如何在 spring 中为 azure-service 总线启用会话启用的消息推送和拉取

示例代码:

private final JmsTemplate jmsTemplate;
public void sendMessage() {
      jmsTemplate.convertAndSend(TOPIC_NAME, "sample_data");
}

@JmsListener(destination = TOPIC_NAME, subscription = SUBSCRIPTION_NAME, containerFactory = "topicJmsListenerContainerFactory")
    public void receiveMessage(Object data) throws JMSException {
//logic to process message
}

FYI:代码对于非会话启用的主题和订阅按预期工作。

我不断收到错误:

javax.jms.JMSException: It is not possible for an entity that requires sessions to create a non-sessionful message receiver. TrackingId:some_uuid, SystemTracker:some_uuid:subscription_name, Timestamp:2023-12-11T20:04:41 TrackingId:some_uuid, SystemTracker:some_uuid, Timestamp:2023-12-11T20:04:41 [condition = amqp:not-allowed]
    at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.JmsMessageConsumer.<init>(JmsMessageConsumer.java:125) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.JmsTopicSubscriber.<init>(JmsTopicSubscriber.java:36) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.JmsDurableTopicSubscriber.<init>(JmsDurableTopicSubscriber.java:29) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.JmsSession.createDurableSubscriber(JmsSession.java:571) ~[qpid-jms-client-0.53.0.jar:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:412) ~[spring-jms-5.3.31.jar:5.3.31]
    at com.sun.proxy.$Proxy237.createDurableSubscriber(Unknown Source) ~[na:na]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:863) ~[spring-jms-5.3.31.jar:5.3.31]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:225) ~[spring-jms-5.3.31.jar:5.3.31]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1264) ~[spring-jms-5.3.31.jar:5.3.31]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1236) ~[spring-jms-5.3.31.jar:5.3.31]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) ~[spring-jms-5.3.31.jar:5.3.31]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) ~[spring-jms-5.3.31.jar:5.3.31]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.qpid.jms.provider.ProviderException: It is not possible for an entity that requires sessions to create a non-sessionful message receiver. 
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.getOpenAbortExceptionFromRemote(AmqpResourceBuilder.java:299) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:185) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:129) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:985) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:871) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556) ~[qpid-jms-client-0.53.0.jar:na]
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383) ~[netty-handler-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1246) ~[netty-handler-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1295) ~[netty-handler-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.92.Final.jar:4.1.92.Final]
    ... 1 common frames omitted

此警告在代码引发异常之前出现:

2023-12-12 01:34:40.868 WARN 72600 --- [ntContainer#0-2] o.s.j.l.DefaultMessageListenerContainer:目标“topic_name”的 JMS 消息侦听器调用程序设置失败 - 尝试恢复。原因:需要会话的实体不可能创建非会话消息接收器。

我正在使用的外部 Maven 依赖项:

<!-- https://mvnrepository.com/artifact/com.azure.spring/spring-cloud-azure-starter-servicebus-jms -->
        <dependency>
            <groupId>com.azure.spring</groupId>
            <artifactId>spring-cloud-azure-starter-servicebus-jms</artifactId>
            <version>4.13.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.3.31</version>
        </dependency>
java spring-boot azureservicebus azure-servicebus-topics azure-servicebus-subscriptions
1个回答
0
投票

我寻找直接使用 Azure Service Bus SDK 的不同方法,我采用了

ServiceBusSenderClient
来发送带有会话 ID 的消息。

import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusSenderClient;

public class MessageSender {

    private final ServiceBusSenderClient sender;

    public MessageSender(String connectionString, String topicName) {
        this.sender = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sender()
                .topicName(topicName)
                .buildSenderClient();
    }

    public void sendMessageWithSessionId(String sessionId, String message) {
        sender.sendMessage(sender.createMessage(message).setSessionId(sessionId));
    }
}
  • 但是我无法在订阅中启用会话消息并收到相同的错误。 enter image description here

然后我再次按照旧的重现重新配置JMS Listener,如下所示。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class JmsConfig {

    @Bean
    public DefaultJmsListenerContainerFactory topicJmsListenerContainerFactory(
            DefaultJmsListenerContainerFactoryConfigurer configurer,
            JmsTemplate jmsTemplate) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, jmsTemplate.getConnectionFactory());
        factory.setSessionTransacted(true); // Enable session transactions if needed
        factory.setSubscriptionDurable(true); // Enable durable subscriptions if needed
        return factory;
    }
}
  • 向主题发送消息时,请确保设置会话 ID 属性。
public class MessageSender {

    private final JmsTemplate jmsTemplate;

    public MessageSender(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void sendMessageWithSessionId(String sessionId, String message) {
        jmsTemplate.send("yourTopicName", session -> {
            javax.jms.Message jmsMessage = session.createTextMessage(message);

            // Set the session ID property
            jmsMessage.setStringProperty("JMSXGroupID", sessionId);

            return jmsMessage;
        });
    }
}
  • 在您的侦听器方法中,检查是否处理具有正确对象类型的消息。

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.