在 Spring Boot 应用程序中连接 eventhub 时出现以下异常。
@SpringBootApplication
public class VALogAnalyticsApplication implements CommandLineRunner {
private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
public static void main(String[] args) {
SpringApplication.run(VALogAnalyticsApplication.class, args);
}
@Override
public void run(String... args) {
log.info("Going to add message {} to sendMessage." + "Hello World");
many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
}
@Bean
public Consumer<Message<String>> consume() {
return message->{
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
log.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued "
+"time: {}",
message.getPayload(),
message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
message.getHeaders().get(EventHubsHeaders.OFFSET),
message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
);
checkpointer.success()
.doOnSuccess(success->log.info("Message '{}' successfully checkpointed",
message.getPayload()))
.doOnError(error->log.error("Exception found", error))
.block();
};
}
}
应用程序.属性
spring.cloud.azure.eventhubs.namespace=<eventHubNameSpace>
spring.cloud.azure.eventhubs.connection-string=Endpoint=sb://test.servicebus.windows.net /;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=adfsad
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<storagAaccountName>
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key=<storageAccessKey>
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<containerName>
spring.cloud.stream.bindings.consume-in-0.destination=<eventHubName>
spring.cloud.stream.bindings.consume-in-0.group=<consumerGroup>
spring.cloud.stream.bindings.supply-out-0.destination=<eventHubName>
spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
spring.cloud.function.definition=consume;supply;
spring.cloud.stream.poller.initial-delay=0
spring.cloud.stream.poller.fixed-delay=1000
2023-08-22 16:51:20.586 INFO 26256 --- [ctor-executor-1] c.a.c.a.i.ReactorDispatcher : {"az.sdk.message":"Reactor selectable is being disposed.","connectionId":"MF_89ee14_1692703254531"}
2023-08-22 16:51:20.587 INFO 26256 --- [ctor-executor-1] c.a.c.a.i.ReactorConnection : {"az.sdk.message":"onConnectionShutdown. Shutting down.","connectionId":"MF_89ee14_1692703254531","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"connectionId[MF_89ee14_1692703254531] Reactor selectable is disposed.","namespace":"test.servicebus.windows.net"}
2023-08-22 16:51:20.639 ERROR 26256 --- [ctor-executor-1] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.core.amqp.exception.AmqpException: Connection timed out: no further information, errorContext[NAMESPACE: test.servicebus.windows.net. ERROR CONTEXT: N/A]
Caused by: com.azure.core.amqp.exception.AmqpException: Connection timed out: no further information, errorContext[NAMESPACE: test.servicebus.windows.net. ERROR CONTEXT: N/A]
at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:85) ~[azure-core-amqp-2.8.7.jar:2.8.7]
at com.azure.core.amqp.implementation.handler.ConnectionHandler.notifyErrorContext(ConnectionHandler.java:351) ~[azure-core-amqp-2.8.7.jar:2.8.7]
at com.azure.core.amqp.implementation.handler.ConnectionHandler.onTransportError(ConnectionHandler.java:253) ~[azure-core-amqp-2.8.7.jar:2.8.7]
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:191) ~[proton-j-0.33.8.jar:na]
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ~[proton-j-0.33.8.jar:na]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) ~[proton-j-0.33.8.jar:na]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) ~[proton-j-0.33.8.jar:na]
at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) ~[azure-core-amqp-2.8.7.jar:2.8.7]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.6.jar:3.4.6]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
Caused by: com.azure.core.amqp.exception.AmqpException: Connection timed out: no further information, errorContext[NAMESPACE: test.servicebus.windows.net. ERROR CONTEXT: N/A]
如果我做错了什么请纠正我
请检查以下几点。 1)请检查为 Azure 事件中心启用了哪些网络 - 公共/私有 2)如果您想同时访问,请在事件中心启用所有网络选项。