Azure eventhub EventHubConsumerAsyncClient无法在Springboot非Web应用程序中启动

问题描述 投票:0回答:1

我正在使用Springboot构建Azure事件中心使用者,它可以与Web配置一起使用。

我正在尝试通过非Web配置获得与以下摘录相同的结果,并获得控制台所示的结果。

依赖关系:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs</artifactId>
        <version>5.0.3</version>
    </dependency>
</dependencies>

AzureEventhubConsumerApplication.java

@EnableAsync
@SpringBootApplication
public class AzureEventhubConsumerApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder(AzureEventhubConsumerApplication.class).web(WebApplicationType.NONE).run(args);
    }
}

EventProcessorHostService.java

public class EventProcessorHostService {

    @Autowired
    EventhubProperties ehProps;

    @Autowired
    TaskExecutor taskexecutor;

    @PostConstruct
    public void run() throws ExecutionException, InterruptedException {
        EventHubConsumerAsyncClient client = new EventHubClientBuilder()
                .connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
                .consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();

        client.receive(true).subscribe(event -> {
            PartitionContext context = event.getPartitionContext();
            EventData eData = event.getData();
            System.out.printf("Event %s is from partition %s%n.", eData.getSequenceNumber(), context.getPartitionId());

        });

    }
}

控制台

 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.2.6.RELEASE)

2020-04-29 18:38:52.905  INFO 2248 --- [  restartedMain] c.e.c.A.AzureEventhubConsumerApplication : Starting AzureEventhubConsumerApplication on Nikhils-MacBook-Pro.local with PID 2248 (/AzureEventhubConsumer/target/classes started by nikhil in /Projects/Code/AzureEventhubConsumer)
2020-04-29 18:38:52.908  INFO 2248 --- [  restartedMain] c.e.c.A.AzureEventhubConsumerApplication : No active profile set, falling back to default profiles: default
2020-04-29 18:38:52.957  INFO 2248 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2020-04-29 18:38:53.467  INFO 2248 --- [  restartedMain] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-04-29 18:38:53.541  INFO 2248 --- [  restartedMain] c.a.m.eventhubs.EventHubClientBuilder    : connectionId[MF_84ae03_1588165733540]: Emitting a single connection.
2020-04-29 18:38:53.623  INFO 2248 --- [  restartedMain] c.a.m.e.i.EventHubConnectionProcessor    : connectionId[sbeventhumdemo.servicebus.windows.net] entityPath[spring-event-hub]: Setting next AMQP channel.
2020-04-29 18:38:53.642  INFO 2248 --- [  restartedMain] c.a.c.a.i.ReactorConnection              : connectionId[MF_84ae03_1588165733540]: Creating and starting connection to sbeventhumdemo.servicebus.windows.net:5671
2020-04-29 18:38:53.662  INFO 2248 --- [  restartedMain] c.a.c.a.implementation.ReactorExecutor   : connectionId[MF_84ae03_1588165733540], message[Starting reactor.]
2020-04-29 18:38:53.684  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ConnectionHandler      : onConnectionInit hostname[sbeventhumdemo.servicebus.windows.net], connectionId[MF_84ae03_1588165733540]
2020-04-29 18:38:53.685  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ReactorHandler         : connectionId[MF_84ae03_1588165733540] reactor.onReactorInit
2020-04-29 18:38:53.687  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ConnectionHandler      : onConnectionLocalOpen hostname[sbeventhumdemo.servicebus.windows.net:5671], connectionId[MF_84ae03_1588165733540], errorCondition[null], errorDescription[null]
2020-04-29 18:38:53.694  INFO 2248 --- [  restartedMain] c.a.c.a.i.ReactorConnection              : Emitting new response channel. connectionId: MF_84ae03_1588165733540. entityPath: $management. linkName: mgmt.
2020-04-29 18:38:53.694  INFO 2248 --- [  restartedMain] a.i.RequestResponseChannel<mgmt-session> : connectionId[MF_84ae03_1588165733540] entityPath[$management]: Setting next AMQP channel.
2020-04-29 18:38:53.696  INFO 2248 --- [  restartedMain] c.a.m.e.i.ManagementChannel              : Management endpoint state: UNINITIALIZED
2020-04-29 18:38:53.772  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ConnectionHandler      : onConnectionBound hostname[sbeventhumdemo.servicebus.windows.net], connectionId[MF_84ae03_1588165733540]
2020-04-29 18:38:53.871  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ReceiveLinkHandler     : onLinkLocalOpen connectionId[MF_84ae03_1588165733540], linkName[mgmt:receiver], localSource[Source{address='$management', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-29 18:38:53.934  INFO 2248 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2020-04-29 18:38:53.961  INFO 2248 --- [  restartedMain] c.e.c.A.AzureEventhubConsumerApplication : Started AzureEventhubConsumerApplication in 1.338 seconds (JVM running for 1.897)
2020-04-29 18:38:53.971  INFO 2248 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
azure spring-boot azure-eventhub
1个回答
0
投票

.subscribe()是无阻塞呼叫。从您的日志看来,应用程序正在退出,然后才能初始化与事件中心的连接并开始获取数据。如果允许应用程序运行更长的时间,或者在Thread.sleep操作之后添加subscribe,则应该会看到一些数据。

作为旁注,当使用来自事件中心的所有事件时,EventProcessorClient更适合生产。它可以负载平衡并跟踪已处理的事件。

private Disposable subscription;
private EventHubConsumerAsyncClient client;

@PostConstruct
public void run() throws ExecutionException, InterruptedException {
    client = new EventHubClientBuilder()
            .connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
            .consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();

    subscription = client.receive(true).subscribe(event -> {
        PartitionContext context = event.getPartitionContext();
        EventData eData = event.getData();
        System.out.printf("Event %s is from partition %s%n.", eData.getSequenceNumber(), context.getPartitionId());
    });
}

@PreDestroy
public void destroy() {
    if (subscription != null) {
        subscription.dispose();
    }
    if (client != null) {
        client.close();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.