Kafka消费者在更改auto.offset.reset=earliest to latest in embeddedKafka junit test后没有在时间范围内收到消息

问题描述 投票:0回答:0

在最近的一次公共库升级中,我发现Kafka消费者没有在预期的时间内(2分钟)收到消息。对照代码做了很多测试,看来是

auto.offset.reset=latest
引起的(原来是
earliest
)。根本原因是什么?

环境:

Spring-Boot 2.7.11; spring-kafka-test 2.8.11

主要消费者配置:

max.poll.records=250
max.poll.interval.ms=300000
heartbeat.interval.ms=3000
enable.auto.commit=false
auto.offset.reset=latest
auto create topic = true

无法复制代码,但主要使用:

ConcurrentKafkaListenerConntainerFactory
@KafkaListener annotation

主要变化是

auto.offset.reset=earliest
latest

后续: 参考 嵌入式 Kafka 测试 - 不调用 Kafka 侦听器

EmbeddedKafka在后续测试延迟后向消费者发送消息

gary russell 提到它可能会在容器完全启动之前发送消息。 那么在这种情况下会发生什么?

  • 消息到达主题了吗?
  • offset=latest,不会指向我发送的消息吗?

再次更新(May/19):

按照建议,我添加了这些代码(但保持 auto.offset.reset=latest):

ContainerTestUtils.waitForAssignment(container, 1) // configured 1 partition in embeddedkafka

然而,我在日志中看到:

Publishing records xxx here
o.a.k.c.c.internals.SubscriptionState: ....Resetting offset for partition xxxx to position FetchPosition(offset=2,...)  

我应该在发布之前重置偏移量并且偏移量=0?

spring-kafka offset spring-kafka-test embedded-kafka
© www.soinside.com 2019 - 2024. All rights reserved.