我正在尝试将 azure-service-bus 集成到我的 java spring-project 中。
我的 azure-service-bus 订阅已启用
sessions
。
我正在使用
org.springframework.jms.annotation.JmsListener
设置监听器。
问题:
问题:
示例代码:
private final JmsTemplate jmsTemplate;
public void sendMessage() {
jmsTemplate.convertAndSend(TOPIC_NAME, "sample_data");
}
@JmsListener(destination = TOPIC_NAME, subscription = SUBSCRIPTION_NAME, containerFactory = "topicJmsListenerContainerFactory")
public void receiveMessage(Object data) throws JMSException {
//logic to process message
}
FYI:代码对于非会话启用的主题和订阅按预期工作。
我不断收到错误:
javax.jms.JMSException: It is not possible for an entity that requires sessions to create a non-sessionful message receiver. TrackingId:some_uuid, SystemTracker:some_uuid:subscription_name, Timestamp:2023-12-11T20:04:41 TrackingId:some_uuid, SystemTracker:some_uuid, Timestamp:2023-12-11T20:04:41 [condition = amqp:not-allowed]
at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.JmsMessageConsumer.<init>(JmsMessageConsumer.java:125) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.JmsTopicSubscriber.<init>(JmsTopicSubscriber.java:36) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.JmsDurableTopicSubscriber.<init>(JmsDurableTopicSubscriber.java:29) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.JmsSession.createDurableSubscriber(JmsSession.java:571) ~[qpid-jms-client-0.53.0.jar:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:412) ~[spring-jms-5.3.31.jar:5.3.31]
at com.sun.proxy.$Proxy237.createDurableSubscriber(Unknown Source) ~[na:na]
at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:863) ~[spring-jms-5.3.31.jar:5.3.31]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:225) ~[spring-jms-5.3.31.jar:5.3.31]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1264) ~[spring-jms-5.3.31.jar:5.3.31]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1236) ~[spring-jms-5.3.31.jar:5.3.31]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) ~[spring-jms-5.3.31.jar:5.3.31]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) ~[spring-jms-5.3.31.jar:5.3.31]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.qpid.jms.provider.ProviderException: It is not possible for an entity that requires sessions to create a non-sessionful message receiver.
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.getOpenAbortExceptionFromRemote(AmqpResourceBuilder.java:299) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:185) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:129) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:985) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:871) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556) ~[qpid-jms-client-0.53.0.jar:na]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383) ~[netty-handler-4.1.92.Final.jar:4.1.92.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1246) ~[netty-handler-4.1.92.Final.jar:4.1.92.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1295) ~[netty-handler-4.1.92.Final.jar:4.1.92.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.92.Final.jar:4.1.92.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.92.Final.jar:4.1.92.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.92.Final.jar:4.1.92.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.92.Final.jar:4.1.92.Final]
... 1 common frames omitted
此警告在代码引发异常之前出现:
2023-12-12 01:34:40.868 WARN 72600 --- [ntContainer#0-2] o.s.j.l.DefaultMessageListenerContainer:目标“topic_name”的 JMS 消息侦听器调用程序设置失败 - 尝试恢复。原因:需要会话的实体不可能创建非会话消息接收器。
我正在使用的外部 Maven 依赖项:
<!-- https://mvnrepository.com/artifact/com.azure.spring/spring-cloud-azure-starter-servicebus-jms -->
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-servicebus-jms</artifactId>
<version>4.13.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.3.31</version>
</dependency>
我寻找直接使用 Azure Service Bus SDK 的不同方法,我采用了
ServiceBusSenderClient
来发送带有会话 ID 的消息。
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
public class MessageSender {
private final ServiceBusSenderClient sender;
public MessageSender(String connectionString, String topicName) {
this.sender = new ServiceBusClientBuilder()
.connectionString(connectionString)
.sender()
.topicName(topicName)
.buildSenderClient();
}
public void sendMessageWithSessionId(String sessionId, String message) {
sender.sendMessage(sender.createMessage(message).setSessionId(sessionId));
}
}
然后我再次按照旧的重现重新配置JMS Listener,如下所示。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
@Configuration
public class JmsConfig {
@Bean
public DefaultJmsListenerContainerFactory topicJmsListenerContainerFactory(
DefaultJmsListenerContainerFactoryConfigurer configurer,
JmsTemplate jmsTemplate) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, jmsTemplate.getConnectionFactory());
factory.setSessionTransacted(true); // Enable session transactions if needed
factory.setSubscriptionDurable(true); // Enable durable subscriptions if needed
return factory;
}
}
public class MessageSender {
private final JmsTemplate jmsTemplate;
public MessageSender(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sendMessageWithSessionId(String sessionId, String message) {
jmsTemplate.send("yourTopicName", session -> {
javax.jms.Message jmsMessage = session.createTextMessage(message);
// Set the session ID property
jmsMessage.setStringProperty("JMSXGroupID", sessionId);
return jmsMessage;
});
}
}