重新连接到同一个消费者组后仅读取来自 Kafka 的新消息

问题描述 投票:0回答:1

我们假设这样一种情况: 有一个 Java Spring 应用程序,它从 Kafka 读取消息 (客户组:我的组名)。

  • 消息出现在 Kafka 上:
    msg_1
    msg_2
    msg_3
  • 应用程序在每次发送确认后读取消息:
    msg_1
    msg_2
    msg_3
  • 应用程序已关闭/崩溃
  • Kafka 上出现新消息:
    msg_4
    msg_5
    msg_6
  • 应用程序开始运行并在客户组MyGroupName中再次重新连接到Kafka (我们可以在 Kafka UI 中看到滞后:3)
  • Kafka 上出现新消息:
    msg_7
    msg_8

然后,如果我将

AUTO_OFFSET_RESET_CONFIG
设置为:

  • latest
    :应用程序将从
    msg_4
    ...
    msg_8
  • 读取消息
  • earliest
    :应用程序将从
    msg_1
    ...
    msg_8
  • 读取消息

我的问题是:应该配置什么来只读重新连接后出现在 Kafka 上的消息? (仅限

msg_7
msg_8
)?

java apache-kafka
1个回答
0
投票

只有当 kakfa 中指定的消费者组(MyGroupName)没有存储的偏移量时,Kafka 才会看到 auto.offset.reset 配置。存储的偏移量只不过是消费者应用程序提交的偏移量。如果有存储的偏移量,那么消费者将得到重新连接后,消息会从中断处继续显示。

如果存储的偏移量被删除,kafka 将检查 auto.offset.reset 配置。

a)如果设置为latest,那么重新连接后您将收到来自msg_7,msg_8的消息。

b)如果设置为最早,那么kafka将发送存储的所有可用偏移量。在这种情况下,如果msg1,msg2 ...没有被删除,那么kafka将从msg1本身发送。

因此,在您的情况下,如果您想在重新连接后仅获取新消息,请在第一次连接时设置以下属性,并进一步重新连接到 kafka。

1.

enable.auto.commit = false

2.

auto.offset.reset = latest

通过

enable.auto.commit = false
确保您的消费者应用程序不会向kafka提交偏移量,因此不会为指定的消费者组存储偏移量。

因此,如果没有存储的偏移量,那么kafka将检查最新的auto.offset.reset,这样每次重新连接时您只会收到最新的消息(msg7和msg8)。

© www.soinside.com 2019 - 2024. All rights reserved.