无法从 azure 事件中心接收消息

问题描述 投票:0回答:1

在 Spring Boot 应用程序中连接 eventhub 时出现以下异常。

@SpringBootApplication
public class VALogAnalyticsApplication implements CommandLineRunner {

private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();

public static void main(String[] args) {
    SpringApplication.run(VALogAnalyticsApplication.class, args);
}

  @Override
  public void run(String... args) {
    log.info("Going to add message {} to sendMessage." + "Hello World");
    many.emitNext(MessageBuilder.withPayload("Hello World").build(),  Sinks.EmitFailureHandler.FAIL_FAST);
  }

  @Bean
  public Consumer<Message<String>> consume() {
    return message->{
        Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
        log.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued "
                        +"time: {}",
                message.getPayload(),
                message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                message.getHeaders().get(EventHubsHeaders.OFFSET),
                message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
          );
          checkpointer.success()
                .doOnSuccess(success->log.info("Message '{}' successfully checkpointed",
                        message.getPayload()))
                .doOnError(error->log.error("Exception found", error))
                .block();
        };
    }
  }

应用程序.属性

spring.cloud.azure.eventhubs.namespace=<eventHubNameSpace>
spring.cloud.azure.eventhubs.connection-string=Endpoint=sb://test.servicebus.windows.net /;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=adfsad
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<storagAaccountName>
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key=<storageAccessKey>
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<containerName>
spring.cloud.stream.bindings.consume-in-0.destination=<eventHubName>
spring.cloud.stream.bindings.consume-in-0.group=<consumerGroup>
spring.cloud.stream.bindings.supply-out-0.destination=<eventHubName>
spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
spring.cloud.function.definition=consume;supply;
spring.cloud.stream.poller.initial-delay=0
spring.cloud.stream.poller.fixed-delay=1000

错误日志:

2023-08-22 16:51:20.586  INFO 26256 --- [ctor-executor-1] c.a.c.a.i.ReactorDispatcher              : {"az.sdk.message":"Reactor selectable is being disposed.","connectionId":"MF_89ee14_1692703254531"}
2023-08-22 16:51:20.587  INFO 26256 --- [ctor-executor-1] c.a.c.a.i.ReactorConnection              : {"az.sdk.message":"onConnectionShutdown. Shutting down.","connectionId":"MF_89ee14_1692703254531","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"connectionId[MF_89ee14_1692703254531] Reactor selectable is disposed.","namespace":"test.servicebus.windows.net"}
2023-08-22 16:51:20.639 ERROR 26256 --- [ctor-executor-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.core.amqp.exception.AmqpException: Connection timed out: no further information, errorContext[NAMESPACE: test.servicebus.windows.net. ERROR CONTEXT: N/A]
Caused by: com.azure.core.amqp.exception.AmqpException: Connection timed out: no further information, errorContext[NAMESPACE: test.servicebus.windows.net. ERROR CONTEXT: N/A]
    at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:85) ~[azure-core-amqp-2.8.7.jar:2.8.7]
    at com.azure.core.amqp.implementation.handler.ConnectionHandler.notifyErrorContext(ConnectionHandler.java:351) ~[azure-core-amqp-2.8.7.jar:2.8.7]
    at com.azure.core.amqp.implementation.handler.ConnectionHandler.onTransportError(ConnectionHandler.java:253) ~[azure-core-amqp-2.8.7.jar:2.8.7]
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:191) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) ~[proton-j-0.33.8.jar:na]
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) ~[proton-j-0.33.8.jar:na]
    at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) ~[azure-core-amqp-2.8.7.jar:2.8.7]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.6.jar:3.4.6]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.6.jar:3.4.6]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
Caused by: com.azure.core.amqp.exception.AmqpException: Connection timed out: no further information, errorContext[NAMESPACE: test.servicebus.windows.net. ERROR CONTEXT: N/A]

如果我做错了什么请纠正我

java spring-boot azure azure-eventhub
1个回答
0
投票

请检查以下几点。 1)请检查为 Azure 事件中心启用了哪些网络 - 公共/私有 2)如果您想同时访问,请在事件中心启用所有网络选项。

© www.soinside.com 2019 - 2024. All rights reserved.