使用jms将消息发布到servicebus时令牌已过期

问题描述 投票:0回答:1

我们目前使用的技术有:

  1. Springboot(3.1.2)
  2. Azure ServiceBus(主题)
  3. Spring JMS

我们按照Spring中的jms访问Azure服务总线文档。如果您需要进一步了解。

我提供了一个高级概述,我们使用 JMS 来处理 Azure ServiceBus 主题中的消息发布(JMSTemplate)和监听(JMSListner)。 JMS 管理授权和令牌生成,我们使用 clientId/secret 建立连接。

在这里,我们必须发布有关该主题的近百万条消息。最终,发布过程太长。因此,在发布消息过程正在进行期间,我们会获取下面提到的错误。因此,我们可能会错过主题中的一些消息。

org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:311) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:184) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:510) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:587) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:664) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at com.cofinity.gr.orchestration.azure.servicebus.AzureServiceBusProvider.publishMessage(AzureServiceBusProvider.java:34) ~[classes!/:0.0.1-SNAPSHOT]
    at com.cofinity.gr.orchestration.azure.servicebus.AzureServiceBusProvider.publishMessage(AzureServiceBusProvider.java:27) ~[classes!/:0.0.1-SNAPSHOT]
    at com.cofinity.gr.orchestration.service.RawDataService.waitForCurationMessage(RawDataService.java:932) ~[classes!/:0.0.1-SNAPSHOT]
    at com.cofinity.gr.orchestration.service.RawDataService.processClusterEntitiesWithValidationFailed(RawDataService.java:272) ~[classes!/:0.0.1-SNAPSHOT]
    at com.cofinity.gr.orchestration.service.RawDataService.processToGenerateBpnV1(RawDataService.java:214) ~[classes!/:0.0.1-SNAPSHOT]
    at jdk.internal.reflect.GeneratedMethodAccessor188.invoke(Unknown Source) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Unknown Source) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) ~[spring-aop-6.0.11.jar!/:6.0.11]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:704) ~[spring-aop-6.0.11.jar!/:6.0.11]
    at com.cofinity.gr.orchestration.service.RawDataService$$SpringCGLIB$$0.processToGenerateBpnV1(<generated>) ~[classes!/:0.0.1-SNAPSHOT]
    at com.cofinity.gr.orchestration.tamr.bpn.BPNProcessor.process(BPNProcessor.java:32) ~[classes!/:0.0.1-SNAPSHOT]
    at com.cofinity.gr.orchestration.tamr.bpn.BPNProcessor.process(BPNProcessor.java:20) ~[classes!/:0.0.1-SNAPSHOT]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:146) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:322) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:220) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:389) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:313) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-6.0.11.jar!/:6.0.11]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:256) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:362) ~[spring-batch-infrastructure-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:206) ~[spring-batch-infrastructure-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:139) ~[spring-batch-infrastructure-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:241) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:227) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:153) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:417) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:132) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:316) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:157) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
    at org.springframework.core.task.SimpleAsyncTaskExecutor$ConcurrencyThrottlingRunnable.run(SimpleAsyncTaskExecutor.java:285) ~[spring-core-6.0.11.jar!/:6.0.11]
    at java.base/java.lang.Thread.run(Unknown Source) ~[na:na]
Caused by: jakarta.jms.JMSException: ExpiredToken: The token is expired. TrackingId:17702e61-fdc7-4d78-9cb8-fd482356a53e_G13, SystemTracker:NoSystemTracker, Timestamp:2024-01-29T11:26:36 TrackingId:f32eed2ed4f849219cd6513f2ae5e58f_G13, SystemTracker:gateway7, Timestamp:2024-01-29T11:26:36 [condition = com.microsoft:auth-failed]
    at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.JmsConnection.send(JmsConnection.java:778) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.JmsNoTxTransactionContext.send(JmsNoTxTransactionContext.java:37) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.JmsSession.send(JmsSession.java:976) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.JmsSession.send(JmsSession.java:855) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.JmsMessageProducer.sendMessage(JmsMessageProducer.java:252) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.JmsMessageProducer.send(JmsMessageProducer.java:200) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.messaginghub.pooled.jms.JmsPoolMessageProducer.sendMessage(JmsPoolMessageProducer.java:194) ~[pooled-jms-3.1.0.jar!/:na]
    at org.messaginghub.pooled.jms.JmsPoolMessageProducer.send(JmsPoolMessageProducer.java:88) ~[pooled-jms-3.1.0.jar!/:na]
    at org.messaginghub.pooled.jms.JmsPoolMessageProducer.send(JmsPoolMessageProducer.java:77) ~[pooled-jms-3.1.0.jar!/:na]
    at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:637) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:611) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.core.JmsTemplate.lambda$send$3(JmsTemplate.java:589) ~[spring-jms-6.0.11.jar!/:6.0.11]
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507) ~[spring-jms-6.0.11.jar!/:6.0.11]
    ... 36 common frames omitted
Caused by: org.apache.qpid.jms.provider.ProviderException: ExpiredToken: The token is expired. TrackingId:17702e61-fdc7-4d78-9cb8-fd482356a53e_G13, SystemTracker:NoSystemTracker, Timestamp:2024-01-29T11:26:36 TrackingId:f32eed2ed4f849219cd6513f2ae5e58f_G13, SystemTracker:gateway7, Timestamp:2024-01-29T11:26:36 [condition = com.microsoft:auth-failed]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.getOpenAbortExceptionFromRemote(AmqpResourceBuilder.java:305) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:191) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:132) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:992) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:878) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:548) ~[qpid-jms-client-2.0.0.jar!/:na]
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:541) ~[qpid-jms-client-2.0.0.jar!/:na]
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383) ~[netty-handler-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1246) ~[netty-handler-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1295) ~[netty-handler-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[netty-transport-classes-epoll-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) ~[netty-transport-classes-epoll-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) ~[netty-transport-classes-epoll-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.94.Final.jar!/:4.1.94.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.94.Final.jar!/:4.1.94.Final]
    ... 1 common frames omitted

几分钟后,JMS 将再次为提供者生成令牌,消息发布将恢复。

spring-boot azure jms azureservicebus spring-jms
1个回答
0
投票

因此,在发布消息过程正在进行期间,我们会获取下面提到的错误。因此,我们可能会错过主题中的一些消息。

需要实现token刷新逻辑。应在令牌过期之前刷新令牌,以保持与 Azure 服务总线的持续通信。

Token刷新实现:

import com.microsoft.azure.servicebus.primitives.ServiceBusException;

public class MessageSender {

    private TokenManager tokenManager;
    private ServiceBusClient serviceBusClient;

    public MessageSender(TokenManager tokenManager, ServiceBusClient serviceBusClient) {
        this.tokenManager = tokenManager;
        this.serviceBusClient = serviceBusClient;
    }

    public void sendMessage(String message) {
        try {
            // Send message using the current token
            serviceBusClient.sendMessage(message);
        } catch (ServiceBusException ex) {
            if (ex instanceof ExpiredTokenException) {
                // Token expired, refresh the token
                String newToken = tokenManager.refreshToken();
                // Update the authentication credentials
                serviceBusClient.updateCredentials(newToken);
                // Retry sending the message
                serviceBusClient.sendMessage(message);
            } else {
                // Handle other exceptions
                ex.printStackTrace();
            }
        }
    }
}

public class TokenManager {

    public String refreshToken() {
        // Logic to refresh the token
        // Make a request to Azure Active Directory or other authentication provider
        // Return the new token
    }
}

public class ServiceBusClient {

    private String token;

    public ServiceBusClient(String token) {
        this.token = token;
    }

    public void sendMessage(String message) throws ServiceBusException {
        // Send message using the token
    }

    public void updateCredentials(String newToken) {
        // Update the token
        this.token = newToken;
    }
}
  • TokenManager
    负责刷新令牌,
    ServiceBusClient
    封装与Azure服务总线的通信,
    MessageSender
    使用
    ServiceBusClient
  • 发送消息

遇到

ExpiredTokenException
时,将刷新令牌,并使用新令牌重试该操作。

enter image description here

ServiceBusClient
类封装了向Azure服务总线发送消息的逻辑。它维护当前的身份验证令牌并在必要时更新它。

参考:

© www.soinside.com 2019 - 2024. All rights reserved.