Alpakka Kafka流永远不会被终止。

问题描述 投票:0回答:1

我们使用Alpakka Kafka流来消费Kafka的事件。下面是流的定义方式。

ConsumerSettings<GenericKafkaKey, GenericKafkaMessage> consumerSettings = 
    ConsumerSettings
        .create(actorSystem, new KafkaJacksonSerializer<>(GenericKafkaKey.class), 
                new KafkaJacksonSerializer<>(GenericKafkaMessage.class))
        .withBootstrapServers(servers).withGroupId(groupId)
        .withClientId(clientId).withProperties(clientConfigs.defaultConsumerConfig());
CommitterSettings committerSettings = CommitterSettings.create(actorSystem)
        .withMaxBatch(20)
        .withMaxInterval(Duration.ofSeconds(30));
Consumer.DrainingControl<Done> control = 
    Consumer.committableSource(consumerSettings, Subscriptions.topics(topics))
        .mapAsync(props.getMessageParallelism(), msg ->
                CompletableFuture.supplyAsync(() -> consumeMessage(msg), actorSystem.dispatcher())
                        .thenCompose(param -> CompletableFuture.supplyAsync(() -> msg.committableOffset())))
        .toMat(Committer.sink(committerSettings), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(materializer);

这是一段关闭流的代码。

CompletionStage<Done> completionStage = control.drainAndShutdown(actorSystem.dispatcher());
completionStage.toCompletableFuture().join();

我试着在可完成的future上做一个get too。但无论是join还是get on future都没有返回。有没有人也遇到过类似的问题?是不是我做错了什么?

apache-kafka akka akka-stream alpakka
1个回答
0
投票

如果你想从流的外部控制流的终止,你需要使用KillSwitch 。https:/doc.akka.iodocsakkacurrentstream-dynamic.html。


0
投票

你的用法看起来是正确的,我无法找出任何会阻碍排水的东西。

与Alpakka Kafka消费者共同错过的事情是 stop-timeout 默认值为30秒。DrainingControl 你可以安全地将其设置为0秒。

请看 https:/doc.akka.iodocsalpakka-kafkacurrentconsumer.html#drain-control

© www.soinside.com 2019 - 2024. All rights reserved.