Spring Boot 3 + Kafka 未在与 SB 2.7 相同的轮询超时中加入/获取记录

问题描述 投票:0回答:1

所以我已将我的应用程序更新到 Spring Boot 3,特别是 3.1.6,但这个问题在我尝试过的所有 3.x 版本上都存在。 基本上,旧的轮询时间不足以再接收记录,而且由于某种原因,重新平衡似乎需要更长的时间(或其他什么?)。

基本上,我可以在 3 种不同的服务中以 2 种方式看到它:

  • 在测试期间,我运行了一个 Kafka 测试容器。我的 Spring Boot 测试成功连接到它以消费消息。然后我有一个断言,它会持续轮询消息 10 秒,如果在 10 秒内没有收到预期的消息,则失败。它曾经在 SB 2.7 + 相同的 Kafka 测试容器上工作,现在只有当我将超时增加到 15-20 秒时才有效。 我可以看到以下日志:
2023-09-04 13:20:10 [2023-09-04 10:20:10,145] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group webstore-monitoring-service in Empty state. Created a new member id consumer-webstore-monitoring-service-1-86502ee0-eddf-445b-81c1-263b4d4e997a and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:10 [2023-09-04 10:20:10,147] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group webstore-monitoring-service in Empty state. Created a new member id consumer-webstore-monitoring-service-2-316b3fc5-30b1-49fa-ad80-c5d506ede709 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:10 [2023-09-04 10:20:10,152] INFO [GroupCoordinator 1]: Preparing to rebalance group webstore-monitoring-service in state PreparingRebalance with old generation 0 (__consumer_offsets-0) (reason: Adding new member consumer-webstore-monitoring-service-1-86502ee0-eddf-445b-81c1-263b4d4e997a with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:10 [2023-09-04 10:20:10,158] INFO [GroupCoordinator 1]: Stabilized group webstore-monitoring-service generation 1 (__consumer_offsets-0) with 2 members (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:10 [2023-09-04 10:20:10,163] INFO [GroupCoordinator 1]: Assignment received from leader consumer-webstore-monitoring-service-1-86502ee0-eddf-445b-81c1-263b4d4e997a for group webstore-monitoring-service for generation 1. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:25 [2023-09-04 10:20:25,325] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group webstore-monitoring-service in Stable state. Created a new member id consumer-webstore-monitoring-service-2-8fcb73c0-e15a-49fa-a57c-68b7136dfe12 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:25 [2023-09-04 10:20:25,325] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group webstore-monitoring-service in Stable state. Created a new member id consumer-webstore-monitoring-service-1-26a5e384-5b9e-4b25-b565-b9dd45989045 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:20:25 [2023-09-04 10:20:25,326] INFO [GroupCoordinator 1]: Preparing to rebalance group webstore-monitoring-service in state PreparingRebalance with old generation 1 (__consumer_offsets-0) (reason: Adding new member consumer-webstore-monitoring-service-2-8fcb73c0-e15a-49fa-a57c-68b7136dfe12 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:21:07 [2023-09-04 10:21:07,177] INFO [GroupCoordinator 1]: Member consumer-webstore-monitoring-service-2-316b3fc5-30b1-49fa-ad80-c5d506ede709 in group webstore-monitoring-service has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:21:07 [2023-09-04 10:21:07,179] INFO [GroupCoordinator 1]: Member consumer-webstore-monitoring-service-1-86502ee0-eddf-445b-81c1-263b4d4e997a in group webstore-monitoring-service has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:21:07 [2023-09-04 10:21:07,181] INFO [GroupCoordinator 1]: Stabilized group webstore-monitoring-service generation 2 (__consumer_offsets-0) with 2 members (kafka.coordinator.group.GroupCoordinator)
2023-09-04 13:21:07 [2023-09-04 10:21:07,190] INFO [GroupCoordinator 1]: Assignment received from leader consumer-webstore-monitoring-service-2-8fcb73c0-e15a-49fa-a57c-68b7136dfe12 for group webstore-monitoring-service for generation 2. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) 
  • 在我的暂存环境中,有一个不同的服务具有以下逻辑:
        try (var consumer = factory.createConsumer()) {
            consumer.subscribe(singletonList(topic));
            // join
            consumer.poll(Duration.ZERO);
            // reset offset to the begging
            consumer.seekToBeginning(emptyList());
            
            var endOffsets = consumer.endOffsets(consumer.assignment());

            do {
                // actually poll for messages
                var consumerRecords = consumer.poll(Duration.ofSeconds(2));
                // ... some code omitted ...
                consumer.commitSync();
            }
            while (notReachedEndOffsets(consumer, endOffsets));
        }

消费者曾经在

consumer.poll(Duration.ZERO)
中加入群组,然后成功读取主题中的所有消息。现在发生的情况是 0 持续时间不够,在第二次轮询 2 秒中没有读取任何记录。

稍微忽略实际代码的质量/逻辑,是的,它需要返工,重点是,它有效并且是同一问题的症状。

现在,我通常可以接受较长时间的轮询,但是:

  • 我想了解是什么变化导致了这种行为变化
  • 我想知道是否有人注意到这一点

我在变更日志中找不到任何相关更改或在任何地方提到这一点

spring-boot spring-kafka kafka-consumer-api spring-boot-3
1个回答
0
投票

所以最终这是两个不同的问题。测试问题解释如下:

  1. 我的服务容器在每个测试课后都会重新启动
  2. 这意味着每个测试类都有一个新的消费者连接到 Kafka
  3. 容器已使用以下命令关闭
    org.testcontainers.containers.GenericContainer#stop
    方法,实际上只是杀死了容器
  4. 这意味着服务没有正常关闭,也没有向 Kafka 发送离开组请求。
  5. 在下一次测试中,Kafka 等待被杀死的容器(来自上一次测试)发送心跳,默认时间为 45 秒。
  6. 收到任何消息后,它会重新平衡并继续工作

因此可以通过两种方式解决这个问题:

  1. 在测试之间正常关闭服务
  2. session.timeout.ms
    设置为较低的值

最终它与 Spring 无关,而是与我们没有使用 3.0 之前的集成测试这一事实

阶段启动问题尚未解决,但我将忽略它,因为无论如何它都是一个糟糕的实现。

© www.soinside.com 2019 - 2024. All rights reserved.