我们如何使用akka kafka连接器控制从kafka队列轮询消息

问题描述 投票:0回答:1

我们有一个用例,我们需要使用基于参与者可用性的 akka kafka 连接器来控制来自 kafka 队列的消息轮询,以便处理消息。我们有 2 个消费者,每个消费者有 5 个参与者来处理消息,我们需要根据两个消费者的参与者可用性来轮询消息。

我们观察到akka Consumer1不断地从kafka轮询消息并将它们存储在akka流中,而consumer2闲置,但我们需要控制这种行为,只要我们有可用的处理actor,那么我们只需要消费者轮询消息。

根据有关背压的 akka 文档,我们尝试添加 Sink.actorRefWithBackPressure() 来控制轮询,但背压发生在 Actor 流上,而不是消费者轮询上。

我们如何根据演员池中我处理演员的可用性来控制消费者轮询。

下面是我为特定主题创建和注册消费者的代码:

public void createAndRegisterConsumer(String groupId, ActorRef actor, SourceType sourceType, String... topics) {
    RestartSource.onFailuresWithBackoff(Duration.ofSeconds(3), Duration.ofSeconds(20), 0.2,
            () -> createRawConsumer(groupId, sourceType, topics).mapMaterializedValue(c -> {
                topicMapping.put(topicKey(topics), c);
                return c;
            })).map(ConsumerRecord::value).runWith(Sink.actorRefWithBackpressure(actor, new StreamInit(), Ack.INSTANCE, new StreamComplete(), (e) -> {
                LOGGER.error("Stream has failed", e);
                return new StreamFailed(e);
            }), materializer);

}

public Source<ConsumerRecord<byte[], String>, Consumer.Control> createRawConsumer(String groupId, SourceType sourceType, String... topics) {
    if (topics == null || topics.length == 0) {
        throw new RuntimeException("Must provide at least one topic to consume");
    }
    final ConsumerSettings<byte[], String> consumerSettings = getConsumerSettings(groupId, this.kafkaConfig, sourceType == SourceType.PLAIN);
    final Subscription subscription = Subscriptions.topics(topics);
    switch (sourceType) {
        case PLAIN:
            return Consumer.plainSource(consumerSettings, subscription);
        case AT_MOST_ONCE:
            return Consumer.atMostOnceSource(consumerSettings, subscription);
        default:
            throw new UnsupportedOperationException("The source type " + sourceType + " is not supported");
    }
}
java akka-stream alpakka reactive-kafka akka-kafka
1个回答
0
投票

完全做到这一点是不可能的:即使没有需求,消费者参与者也会定期进行轮询。部分原因是旧 Kafka 的遗留问题,不进行轮询可能会导致消费者重新平衡。

可以通过

ConsumerSettings
:

将计划轮询之间的间隔设置为任意长
consumerSettings.withPollInterval(Duration.ofHours(1))
© www.soinside.com 2019 - 2024. All rights reserved.