所以我已将我的应用程序更新到 Spring Boot 3,特别是 3.1.6,但这个问题在我尝试过的所有 3.x 版本上都存在。 基本上,旧的轮询时间不足以再接收记录,而且由于某种原因,重新平衡似乎需要更长的时间(或其他什么?)。
基本上,我可以在 3 种不同的服务中以 2 种方式看到它:
2023-09-04 13:20:10 [2023-09-04 10:20:10,145] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group webstore-monitoring-service in Empty state. Created a new member id consumer-webstore-monitoring-service-1-86502ee0-eddf-445b-81c1-263b4d4e997a and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:10 [2023-09-04 10:20:10,147] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group webstore-monitoring-service in Empty state. Created a new member id consumer-webstore-monitoring-service-2-316b3fc5-30b1-49fa-ad80-c5d506ede709 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:10 [2023-09-04 10:20:10,152] INFO [GroupCoordinator 1]: Preparing to rebalance group webstore-monitoring-service in state PreparingRebalance with old generation 0 (__consumer_offsets-0) (reason: Adding new member consumer-webstore-monitoring-service-1-86502ee0-eddf-445b-81c1-263b4d4e997a with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:10 [2023-09-04 10:20:10,158] INFO [GroupCoordinator 1]: Stabilized group webstore-monitoring-service generation 1 (__consumer_offsets-0) with 2 members (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:10 [2023-09-04 10:20:10,163] INFO [GroupCoordinator 1]: Assignment received from leader consumer-webstore-monitoring-service-1-86502ee0-eddf-445b-81c1-263b4d4e997a for group webstore-monitoring-service for generation 1. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:25 [2023-09-04 10:20:25,325] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group webstore-monitoring-service in Stable state. Created a new member id consumer-webstore-monitoring-service-2-8fcb73c0-e15a-49fa-a57c-68b7136dfe12 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:25 [2023-09-04 10:20:25,325] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group webstore-monitoring-service in Stable state. Created a new member id consumer-webstore-monitoring-service-1-26a5e384-5b9e-4b25-b565-b9dd45989045 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:25 [2023-09-04 10:20:25,326] INFO [GroupCoordinator 1]: Preparing to rebalance group webstore-monitoring-service in state PreparingRebalance with old generation 1 (__consumer_offsets-0) (reason: Adding new member consumer-webstore-monitoring-service-2-8fcb73c0-e15a-49fa-a57c-68b7136dfe12 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:21:07 [2023-09-04 10:21:07,177] INFO [GroupCoordinator 1]: Member consumer-webstore-monitoring-service-2-316b3fc5-30b1-49fa-ad80-c5d506ede709 in group webstore-monitoring-service has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:21:07 [2023-09-04 10:21:07,179] INFO [GroupCoordinator 1]: Member consumer-webstore-monitoring-service-1-86502ee0-eddf-445b-81c1-263b4d4e997a in group webstore-monitoring-service has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:21:07 [2023-09-04 10:21:07,181] INFO [GroupCoordinator 1]: Stabilized group webstore-monitoring-service generation 2 (__consumer_offsets-0) with 2 members (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:21:07 [2023-09-04 10:21:07,190] INFO [GroupCoordinator 1]: Assignment received from leader consumer-webstore-monitoring-service-2-8fcb73c0-e15a-49fa-a57c-68b7136dfe12 for group webstore-monitoring-service for generation 2. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
try (var consumer = factory.createConsumer()) {
consumer.subscribe(singletonList(topic));
// join
consumer.poll(Duration.ZERO);
// reset offset to the begging
consumer.seekToBeginning(emptyList());
var endOffsets = consumer.endOffsets(consumer.assignment());
do {
// actually poll for messages
var consumerRecords = consumer.poll(Duration.ofSeconds(2));
// ... some code omitted ...
consumer.commitSync();
}
while (notReachedEndOffsets(consumer, endOffsets));
}
消费者曾经在
consumer.poll(Duration.ZERO)
中加入群组,然后成功读取主题中的所有消息。现在发生的情况是 0 持续时间不够,在第二次轮询 2 秒中没有读取任何记录。
稍微忽略实际代码的质量/逻辑,是的,它需要返工,重点是,它有效并且是同一问题的症状。
现在,我通常可以接受较长时间的轮询,但是:
我在变更日志中找不到任何相关更改或在任何地方提到这一点
所以最终这是两个不同的问题。测试问题解释如下:
org.testcontainers.containers.GenericContainer#stop
方法,实际上只是杀死了容器因此可以通过两种方式解决这个问题:
session.timeout.ms
设置为较低的值最终它与 Spring 无关,而是与我们没有使用 3.0 之前的集成测试这一事实
阶段启动问题尚未解决,但我将忽略它,因为无论如何它都是一个糟糕的实现。