在最近的一次公共库升级中,我发现Kafka消费者没有在预期的时间内(2分钟)收到消息。对照代码做了很多测试,看来是
auto.offset.reset=latest
引起的(原来是earliest
)。根本原因是什么?
环境:
Spring-Boot 2.7.11; spring-kafka-test 2.8.11
主要消费者配置:
max.poll.records=250
max.poll.interval.ms=300000
heartbeat.interval.ms=3000
enable.auto.commit=false
auto.offset.reset=latest
auto create topic = true
无法复制代码,但主要使用:
ConcurrentKafkaListenerConntainerFactory
@KafkaListener annotation
主要变化是
auto.offset.reset=earliest
到latest
后续: 参考 嵌入式 Kafka 测试 - 不调用 Kafka 侦听器
gary russell 提到它可能会在容器完全启动之前发送消息。 那么在这种情况下会发生什么?
再次更新(May/19):
按照建议,我添加了这些代码(但保持 auto.offset.reset=latest):
ContainerTestUtils.waitForAssignment(container, 1) // configured 1 partition in embeddedkafka
然而,我在日志中看到:
Publishing records xxx here
o.a.k.c.c.internals.SubscriptionState: ....Resetting offset for partition xxxx to position FetchPosition(offset=2,...)
我应该在发布之前重置偏移量并且偏移量=0?