我有一个Kafka主题和一个在Spring Cloud应用程序中分配(必须)的消费者组的消费者。作为一项要求,在每次应用程序重启时,我都需要从头开始阅读所有收到的消息。这应该是由resetOffsets
财产实现的,但从this issue可以清楚地看出它目前无效。
我发现在kafka consumer api中使用的this workaround,建议在每次重启时为消费者组分配一个新的随机名称,作为从最早开始阅读的一种方式。是否可以/推荐Spring Cloud Stream?如何为消费者组定义动态名称?
是的,它也适用于SCSt,但正如你所说,设置随机组ID有点棘手,尽管你可以在启动System.property
之前将其设置为SpringApplication
。
如果你直接使用spring-kafka,那很容易,只需实现ConsumerSeekAware
,你可以在分配分区时使用seekToBeginning
。
但是,使用SCSt,您无法直接访问侦听器。
一种解决方法是在通过创建具有相同组ID的使用者启动SpringApplication
之前手动执行搜索。它有点棘手,但如果您有多个应用程序实例,因为每次都可能获得不同的分区。
我们将再次考虑解决这个问题(我刚刚对此发表评论)。
如果您需要应用程序每次从头重新启动,您有几个选项:
earliest
工具(kafka-consumer-groups.sh
)重新启动应用程序之前将提交的偏移重置为kafka.admin.ConsumerGroupCommand.scala
auto.offset.reset
设置为earliest
,即使0不是有效偏移量,它也会从头开始重新启动。group.id
值。在您的消费者Configuration
bean中,在Properties
对象中插入如下内容:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
最后,您是否使用委员会抵消?如果没有,只需禁用enable.auto.commit
然后应用程序将始终遵循auto.offset.reset
设置。
选项1和2通常是首选,因为它们保持一致的group.id
,允许轻松地将消费者实例添加到组并监视组。