我们假设这样一种情况: 有一个 Java Spring 应用程序,它从 Kafka 读取消息 (客户组:我的组名)。
msg_1
、msg_2
、msg_3
msg_1
、msg_2
、msg_3
。msg_4
、msg_5
、msg_6
msg_7
、msg_8
然后,如果我将
AUTO_OFFSET_RESET_CONFIG
设置为:
latest
:应用程序将从 msg_4
、...
、msg_8
earliest
:应用程序将从 msg_1
、...
、msg_8
我的问题是:应该配置什么来只读重新连接后出现在 Kafka 上的消息? (仅限
msg_7
、msg_8
)?
只有当 kakfa 中指定的消费者组(MyGroupName)没有存储的偏移量时,Kafka 才会看到 auto.offset.reset 配置。存储的偏移量只不过是消费者应用程序提交的偏移量。如果有存储的偏移量,那么消费者将得到重新连接后,消息会从中断处继续显示。
如果存储的偏移量被删除,kafka 将检查 auto.offset.reset 配置。
a)如果设置为latest,那么重新连接后您将收到来自msg_7,msg_8的消息。
b)如果设置为最早,那么kafka将发送存储的所有可用偏移量。在这种情况下,如果msg1,msg2 ...没有被删除,那么kafka将从msg1本身发送。
因此,在您的情况下,如果您想在重新连接后仅获取新消息,请在第一次连接时设置以下属性,并进一步重新连接到 kafka。
1.
enable.auto.commit = false
2.
auto.offset.reset = latest
通过
enable.auto.commit = false
确保您的消费者应用程序不会向kafka提交偏移量,因此不会为指定的消费者组存储偏移量。
因此,如果没有存储的偏移量,那么kafka将检查最新的auto.offset.reset,这样每次重新连接时您只会收到最新的消息(msg7和msg8)。