在向 Azure 队列发送消息时,面临连接被对等方重置、errorContext[NAMESPACE: <Namespace>]

问题描述 投票:0回答:1

我正在尝试使用 ClientSecretCredential 将消息发送到 Azure 服务总线上的队列。使用 AzureSDK 代码时出现错误。下面给出了代码和错误跟踪。使用了最新的jar,代码中没有编译错误。 当尝试在 HTTPS 上发送消息时,它工作得非常好。

import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusSenderClient;


public class App {

    /* variables removed for confidentiality*/
    
    public void sendMessage() throws InterruptedException,Exception{

        ServiceBusMessage guestCheckInEvent = new ServiceBusMessage(data);
       
     
        guestCheckInEvent.setContentType(contentType);
        guestCheckInEvent.setMessageId(messageId);
        System.setProperty("AZURE_CLIENT_ID", clientId);
        System.setProperty("AZURE_CLIENT_SECRET", clientSecret);
        System.setProperty("AZURE_TENANT_ID", tenantId);
     
        ClientSecretCredential credential = new ClientSecretCredentialBuilder()
            .clientId(clientId)
            .tenantId(tenantId)
            .clientSecret(clientSecret)
            .build();
        
            ServiceBusSenderClient sender = new ServiceBusClientBuilder()
            .fullyQualifiedNamespace(nameSpace)
            .credential(credential)
            .sender()
            .queueName(queueName)
            .buildClient();      
                
        System.out.println("Sending message");
        sender.sendMessage(guestCheckInEvent);
        System.out.println("Sent message");
        sender.close();
    }
      public static void main(String[] args) {
        
        App a=new App();
        try {
            a.sendMessage();
        } catch (Exception e) {
            e.printStackTrace();
        }
            

        
    }
    }

错误

com.azure.messaging.servicebus.ServiceBusException: Retries exhausted: 3/3
        at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.mapError(ServiceBusSenderAsyncClient.java:823)
        at reactor.core.publisher.Mono.lambda$onErrorMap$28(Mono.java:3773)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
        at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
        at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.maybeOnError(FluxConcatMapNoPrefetch.java:326)
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:211)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
        at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:471)
        at reactor.core.publisher.SinkManyEmitterProcessor.tryEmitNext(SinkManyEmitterProcessor.java:269)
        at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
        at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
        at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
        at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
        at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:219)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onError(FluxHide.java:142)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
        at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
        at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
        at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:219)
        at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162)
        at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:865)
        at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
        at reactor.core.publisher.FluxReplay$ReplaySubscriber.onError(FluxReplay.java:1360)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:231)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
        at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:272)
        at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:86)
        at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
        at reactor.core.publisher.InternalEmptySink.emitEmpty(InternalEmptySink.java:26)
        at com.azure.core.amqp.implementation.ReactorConnection.lambda$closeConnectionWork$35(ReactorConnection.java:540)
        at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73)
        at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:228)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
        at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:272)
        at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:86)
        at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
        at reactor.core.publisher.InternalEmptySink.emitEmpty(InternalEmptySink.java:26)
        at com.azure.core.amqp.implementation.ReactorExecutor.close(ReactorExecutor.java:188)
        at com.azure.core.amqp.implementation.ReactorExecutor.lambda$scheduleCompletePendingTasks$1(ReactorExecutor.java:173)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
        Suppressed: java.lang.Exception: #block terminated with an error
                at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:139)
                at reactor.core.publisher.Mono.block(Mono.java:1734)
                at com.azure.messaging.servicebus.ServiceBusSenderClient.sendMessage(ServiceBusSenderClient.java:192)
                at App.sendMessage(App.java:44)
                at App.main(App.java:52)
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
        at reactor.core.Exceptions.retryExhausted(Exceptions.java:306)
        at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:68)
        at reactor.util.retry.RetryBackoffSpec.lambda$null$4(RetryBackoffSpec.java:560)
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:183)
        ... 55 more
Caused by: com.azure.core.amqp.exception.AmqpException: Connection reset by peer, errorContext[NAMESPACE: <Namespace>. ERROR CONTEXT: N/A]
        at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:85)
        at com.azure.core.amqp.implementation.handler.ConnectionHandler.notifyErrorContext(ConnectionHandler.java:351)
        at com.azure.core.amqp.implementation.handler.ConnectionHandler.onTransportError(ConnectionHandler.java:253)
        at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:191)
        at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
        at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
        at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
        at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)
        ... 8 more
azure azureservicebus amqp azure-sdk
1个回答
0
投票

使用 ClientSecretCredential 将消息发送到 Azure 服务总线上的队列。

该错误可能是您没有发送消息的适当角色或权限。您需要将

Azure service bus Data Owner
分配给您的应用程序,如下所示:

传送门: enter image description here

将角色分配给应用程序后。我尝试使用下面的代码,它使用 Azure Java SDK 向队列发送一条消息。

代码:

import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusSenderClient;


public final class App {
    public static void main(String[] args) { 

        String data="Hi how are you?";
        String contentType="text/plain";
        String messageId="6";
        String queueName="test1";
        String clientId="xxx";
        String tenantId="xxxxx";
        String clientSecret="xxxxxx";

        ClientSecretCredential credential = new ClientSecretCredentialBuilder()
            .clientId(clientId)
            .tenantId(tenantId)
            .clientSecret(clientSecret)
            .build();

    ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
            .fullyQualifiedNamespace("venkat678.servicebus.windows.net")
            .credential(credential)
            .sender()
            .queueName(queueName)
            .buildClient();
        ServiceBusMessage guestCheckInEvent = new ServiceBusMessage(data);
        guestCheckInEvent.setContentType(contentType);
        guestCheckInEvent.setMessageId(messageId);
    // send one message to the queue
    senderClient.sendMessage(guestCheckInEvent);
    System.out.println("Sent a single message to the queue with properties: " + queueName);
        }
    

输出:

Sent a single message to the queue with properties: test1

enter image description here

传送门: enter image description here

参考:

Azure 服务总线队列入门 (Java) - Azure 服务总线 |微软学习

© www.soinside.com 2019 - 2024. All rights reserved.