Alpakka kafka消费者补偿金额

问题描述 投票:1回答:1

我正在Scala中使用Alpakka-kafka来消费一个Kafka主题。这是我的代码:

    val kafkaConsumerSettings: ConsumerSettings[String, String] =
      ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers(kafkaConfig.server)
        .withGroupId(kafkaConfig.group)
        .withProperties(
          ConsumerConfig.MAX_POLL_RECORDS_CONFIG       -> "100",
          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG      -> "earliest",
          CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL"
        )

    Consumer
        .plainSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConfig.topic))
        .runWith(Sink.foreach(println))

但是,使用者仅从主题中第一条未提交的消息开始轮询。我想始终从偏移量0开始,而不考虑提交的消息。对于Alpakka Consumer,如何手动指定偏移量?

scala apache-kafka kafka-consumer-api akka-stream alpakka
1个回答
0
投票

我认为您想添加几个配置项:

  1. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> False因此您的工作永远不会保存任何偏移量

  2. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"因此您的工作从头开始。

如果您的作业过去已经提交过偏移,则可能必须将其偏移最早重置为。

© www.soinside.com 2019 - 2024. All rights reserved.