通过在每次重启应用程序时创建新的使用者组来重置主题偏移

问题描述 投票:0回答:2

我有一个Kafka主题和一个在Spring Cloud应用程序中分配(必须)的消费者组的消费者。作为一项要求,在每次应用程序重启时,我都需要从头开始阅读所有收到的消息。这应该是由resetOffsets财产实现的,但从this issue可以清楚地看出它目前无效。

我发现在kafka consumer api中使用的this workaround,建议在每次重启时为消费者组分配一个新的随机名称,作为从最早开始阅读的一种方式。是否可以/推荐Spring Cloud Stream?如何为消费者组定义动态名称?

apache-kafka spring-cloud-stream
2个回答
1
投票

是的,它也适用于SCSt,但正如你所说,设置随机组ID有点棘手,尽管你可以在启动System.property之前将其设置为SpringApplication

如果你直接使用spring-kafka,那很容易,只需实现ConsumerSeekAware,你可以在分配分区时使用seekToBeginning

但是,使用SCSt,您无法直接访问侦听器。

一种解决方法是在通过创建具有相同组ID的使用者启动SpringApplication之前手动执行搜索。它有点棘手,但如果您有多个应用程序实例,因为每次都可能获得不同的分区。

我们将再次考虑解决这个问题(我刚刚对此发表评论)。


1
投票

如果您需要应用程序每次从头重新启动,您有几个选项:

  1. 您可以在使用earliest工具(kafka-consumer-groups.sh)重新启动应用程序之前将提交的偏移重置为kafka.admin.ConsumerGroupCommand.scala
  2. 重新启动时,应用程序可以查找开头并手动提交偏移量0.如果将auto.offset.reset设置为earliest,即使0不是有效偏移量,它也会从头开始重新启动。
  3. 您每次都可以使用不同的消费者group.id值。在您的消费者Configuration bean中,在Properties对象中插入如下内容: properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

最后,您是否使用委员会抵消?如果没有,只需禁用enable.auto.commit然后应用程序将始终遵循auto.offset.reset设置。

选项1和2通常是首选,因为它们保持一致的group.id,允许轻松地将消费者实例添加到组并监视组。

© www.soinside.com 2019 - 2024. All rights reserved.