我正在尝试使用 ClientSecretCredential 将消息发送到 Azure 服务总线上的队列。使用 AzureSDK 代码时出现错误。下面给出了代码和错误跟踪。使用了最新的jar,代码中没有编译错误。 当尝试在 HTTPS 上发送消息时,它工作得非常好。
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
public class App {
/* variables removed for confidentiality*/
public void sendMessage() throws InterruptedException,Exception{
ServiceBusMessage guestCheckInEvent = new ServiceBusMessage(data);
guestCheckInEvent.setContentType(contentType);
guestCheckInEvent.setMessageId(messageId);
System.setProperty("AZURE_CLIENT_ID", clientId);
System.setProperty("AZURE_CLIENT_SECRET", clientSecret);
System.setProperty("AZURE_TENANT_ID", tenantId);
ClientSecretCredential credential = new ClientSecretCredentialBuilder()
.clientId(clientId)
.tenantId(tenantId)
.clientSecret(clientSecret)
.build();
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.fullyQualifiedNamespace(nameSpace)
.credential(credential)
.sender()
.queueName(queueName)
.buildClient();
System.out.println("Sending message");
sender.sendMessage(guestCheckInEvent);
System.out.println("Sent message");
sender.close();
}
public static void main(String[] args) {
App a=new App();
try {
a.sendMessage();
} catch (Exception e) {
e.printStackTrace();
}
}
}
com.azure.messaging.servicebus.ServiceBusException: Retries exhausted: 3/3
at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.mapError(ServiceBusSenderAsyncClient.java:823)
at reactor.core.publisher.Mono.lambda$onErrorMap$28(Mono.java:3773)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.maybeOnError(FluxConcatMapNoPrefetch.java:326)
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:211)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:471)
at reactor.core.publisher.SinkManyEmitterProcessor.tryEmitNext(SinkManyEmitterProcessor.java:269)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:219)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onError(FluxHide.java:142)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:219)
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:865)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
at reactor.core.publisher.FluxReplay$ReplaySubscriber.onError(FluxReplay.java:1360)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:231)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:272)
at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:86)
at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
at reactor.core.publisher.InternalEmptySink.emitEmpty(InternalEmptySink.java:26)
at com.azure.core.amqp.implementation.ReactorConnection.lambda$closeConnectionWork$35(ReactorConnection.java:540)
at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73)
at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:228)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:272)
at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:86)
at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
at reactor.core.publisher.InternalEmptySink.emitEmpty(InternalEmptySink.java:26)
at com.azure.core.amqp.implementation.ReactorExecutor.close(ReactorExecutor.java:188)
at com.azure.core.amqp.implementation.ReactorExecutor.lambda$scheduleCompletePendingTasks$1(ReactorExecutor.java:173)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:139)
at reactor.core.publisher.Mono.block(Mono.java:1734)
at com.azure.messaging.servicebus.ServiceBusSenderClient.sendMessage(ServiceBusSenderClient.java:192)
at App.sendMessage(App.java:44)
at App.main(App.java:52)
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
at reactor.core.Exceptions.retryExhausted(Exceptions.java:306)
at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:68)
at reactor.util.retry.RetryBackoffSpec.lambda$null$4(RetryBackoffSpec.java:560)
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:183)
... 55 more
Caused by: com.azure.core.amqp.exception.AmqpException: Connection reset by peer, errorContext[NAMESPACE: <Namespace>. ERROR CONTEXT: N/A]
at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:85)
at com.azure.core.amqp.implementation.handler.ConnectionHandler.notifyErrorContext(ConnectionHandler.java:351)
at com.azure.core.amqp.implementation.handler.ConnectionHandler.onTransportError(ConnectionHandler.java:253)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:191)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)
... 8 more
使用 ClientSecretCredential 将消息发送到 Azure 服务总线上的队列。
该错误可能是您没有发送消息的适当角色或权限。您需要将
Azure service bus Data Owner
分配给您的应用程序,如下所示:
传送门:
将角色分配给应用程序后。我尝试使用下面的代码,它使用 Azure Java SDK 向队列发送一条消息。
代码:
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
public final class App {
public static void main(String[] args) {
String data="Hi how are you?";
String contentType="text/plain";
String messageId="6";
String queueName="test1";
String clientId="xxx";
String tenantId="xxxxx";
String clientSecret="xxxxxx";
ClientSecretCredential credential = new ClientSecretCredentialBuilder()
.clientId(clientId)
.tenantId(tenantId)
.clientSecret(clientSecret)
.build();
ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
.fullyQualifiedNamespace("venkat678.servicebus.windows.net")
.credential(credential)
.sender()
.queueName(queueName)
.buildClient();
ServiceBusMessage guestCheckInEvent = new ServiceBusMessage(data);
guestCheckInEvent.setContentType(contentType);
guestCheckInEvent.setMessageId(messageId);
// send one message to the queue
senderClient.sendMessage(guestCheckInEvent);
System.out.println("Sent a single message to the queue with properties: " + queueName);
}
输出:
Sent a single message to the queue with properties: test1
传送门:
参考: