Kafka Consumer 不断重新平衡

问题描述 投票:0回答:3

即使没有处理源记录,我的 Kafka Consumer 客户端也会在频繁的轮询之间保持重新平衡。

此外,我还在正确的位置实现了

consumer.pause()
consumer.resume()
,以阻止消费者轮询,直到轮询的消费者记录数量与后端 API 隔离处理。不确定是什么导致了这个问题。

消费日志

2022-07-21 10:27:08; LOG_LEVEL="INFO"; SOURCE="MyKAFKAConsumer"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="**Polling...**"

2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="MyKAFKAConsumer"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="**Polling...**"

2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Group coordinator brokerHost:9093 (id: 2147444144 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response.isDisconnected: false. Rediscovery will be attempted."

2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Discovered group coordinator brokerHost:9093 (id: 2147444144 rack: null)"

2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Discovered group coordinator brokerHost:9093 (id: 2147444144 rack: null)"

2022-07-21 10:27:25; LOG_LEVEL="ERROR"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Offset commit failed on partition MySourceTopic-0 at offset 38572: The coordinator is not aware of this member."

2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] OffsetCommit failed with Generation{generationId=18661, memberId='MyKAFKAConsumerClientId1-d410deb4-d6f9-4f7e-b789-f866b89817e6', protocol='range'}: The coordinator is not aware of this member."

2022-07-21 10:27:25; LOG_LEVEL="WARN"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Asynchronous auto-commit of offsets {MySourceTopic-0=OffsetAndMetadata{offset=38572, leaderEpoch=132, metadata=''}, MySourceTopic-2=OffsetAndMetadata{offset=38566, leaderEpoch=122, metadata=''}, MySourceTopic-1=OffsetAndMetadata{offset=38779, leaderEpoch=118, metadata=''}, MySourceTopic-3=OffsetAndMetadata{offset=38585, leaderEpoch=121, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records."

2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Failing OffsetCommit request since the consumer is not part of an active group"

2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Attempt to heartbeat with stale Generation{generationId=18661, memberId='MyKAFKAConsumerClientId1-d410deb4-d6f9-4f7e-b789-f866b89817e6', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, ignoring the error"

2022-07-21 10:27:25; LOG_LEVEL="WARN"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Synchronous auto-commit of offsets {MySourceTopic-0=OffsetAndMetadata{offset=38572, leaderEpoch=132, metadata=''}, MySourceTopic-2=OffsetAndMetadata{offset=38566, leaderEpoch=122, metadata=''}, MySourceTopic-1=OffsetAndMetadata{offset=38779, leaderEpoch=118, metadata=''}, MySourceTopic-3=OffsetAndMetadata{offset=38585, leaderEpoch=121, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group."

2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group"

2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Lost previously assigned partitions MySourceTopic-0, MySourceTopic-2, MySourceTopic-1, MySourceTopic-3"

2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] (Re-)joining group"
2022-07-21 10:27:25; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] (Re-)joining group"

2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Successfully joined group with generation Generation{generationId=18663, memberId='MyKAFKAConsumerClientId1-f55c7204-c932-40cd-a9b3-0351f9904026', protocol='range'}"

2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Successfully synced group in generation Generation{generationId=18663, memberId='MyKAFKAConsumerClientId1-f55c7204-c932-40cd-a9b3-0351f9904026', protocol='range'}"

2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Notifying assignor about the new Assignment(partitions=[MySourceTopic-0, MySourceTopic-1, MySourceTopic-2, MySourceTopic-3])"

2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Adding newly assigned partitions: MySourceTopic-0, MySourceTopic-2, MySourceTopic-1, MySourceTopic-3"

2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-0 to the committed offset FetchPosition{offset=38572, offsetEpoch=Optional[132], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 33708 rack: null)], epoch=132}}"

2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-2 to the committed offset FetchPosition{offset=38566, offsetEpoch=Optional[122], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 39217 rack: null)], epoch=122}}"

2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-1 to the committed offset FetchPosition{offset=38779, offsetEpoch=Optional[118], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 30678 rack: null)], epoch=118}}"

2022-07-21 10:27:26; LOG_LEVEL="INFO"; SOURCE="org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-3 to the committed offset FetchPosition{offset=38585, offsetEpoch=Optional[121], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 38300 rack: null)], epoch=121}}"

2022-07-21 10:27:30; LOG_LEVEL="INFO"; SOURCE="MyKAFKAConsumer"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="**Polling...**"

2022-07-21 10:27:35; LOG_LEVEL="INFO"; SOURCE="MyKAFKAConsumer"; PLATFORM="Retail_Products"; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="**Polling...**"

消费者配置

    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    enable.auto.commit = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    heartbeat.interval.ms = 3000
    max.poll.interval.ms = 300000
    max.poll.records = 100

消费者 poll() 实现

        while (true) {
            try {
                ConsumerRecords<String, GenericRecord> records = null;
                LOGGER.info("Polling...{}");
                records = consumer.poll(Duration.ofSeconds(5));

                if (records != null && records.count() > 0) {
                    consumer.pause(consumer.assignment());
                    recordExecutor.execute(records);
                }

                consumer.resume(consumer.assignment());
                consumer.commitSync();

            } catch (SerializationException e) {
                continue;
            } catch (Exception e) {
                continue;
            }
        }

提前致谢。

apache-kafka kafka-consumer-api
3个回答
0
投票

这里有几点我想提一下。

  1. 理想情况下,您不应该在您的用例中使用
    consumer.pause()
    consumer.resume()
    。如果没有记录,则
    consumer.poll()
    将在其参数中指定的时间内阻塞(如果没有可用记录)。因此,它不应该导致任何高 CPU 使用率,因为它不是非阻塞调用。

来自 consumer.poll()

的文档

如果有可用记录,该方法会立即返回。 否则,它将等待超时时间。如果超时时间到了, 将返回一个空记录集。

  1. max.poll.interval.ms
    与您应该在
    consumer.poll()
    中传递的内容不同。如果两者相同,
    consumer.poll()
    首先就不会有这个参数。
    max.poll.interval.ms
    是最大值。轮询调用之间的延迟。

    因此,当没有记录时,您将等待直到

    max.poll.interval.ms
    ,下一次
    poll()
    来电时。由于在这个超时时间内没有进行下一次 poll(),消费者被认为失败。

来自 max.poll.interval.ms 的文档:

如果在此超时之前未调用 poll(),则 消费者被认为失败,该组将重新平衡以便 将分区重新分配给另一个成员

  1. 此外,您似乎正在使用

    commitSync()
    并且您的配置是
    enable.auto.commit=true
    。理想情况下,您应该使用其中任何一个。

  2. 如果您不想抛出异常,请将异常记录在 catch 块中。


0
投票

经过广泛的分析,我们已经确定了这些频繁重新平衡的根本原因,这是由于消费者/主播和组协调员(Leader Broker)之间的 session.timeout 造成的。由于托管这些消费者/流媒体的服务器容量问题(100% CPU 利用率),session.timeout 不断发生。当 CPU 饱和到 100% 时,所有这些 KAFKA 客户端都会进入“无响应”状态,并最终导致会话超时。 事件日志

group-MyKAFKAConsumeGroupId1]组协调器brokerHost:9093(id:21474机架:null)不可用或无效,原因是:

会话超时

未收到心跳响应。isDisconnected:false。将尝试重新发现。


0
投票

2022-07-21 10:27:25; LOG_LEVEL="信息"; SOURCE="org.apache.kafka.clients.consumer.internals.AbstractCoordinator"; 平台=“零售_产品”; SERVICE="MyKAFKAConsumer"; EVENT_MESSAGE="[消费者 clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] 组协调员brokerHost:9093 (id: 2147444144 机架:空)不可用或无效,原因是:会话 未收到心跳响应超时。isDisconnected: 错误的。将尝试重新发现。”

当您在 kafka 消费者上遇到会话超时时,即组协调器在指定的时间限制内未收到来自 kafka 消费者心跳线程的心跳时,会发生此错误(奇怪的是 Kafka 消费者将其记录在“INFO”下)。在java中,决定这个的配置属性是“session.timeout.ms”。最终,kafka 消息的提交将失败,您的应用程序将经历重新平衡。我在我的应用程序中遇到了这个问题,这是一个恼人的问题。

有趣的部分是为什么我们首先在应用程序中遇到这个错误。我们使用 Java 飞行记录器对应用程序进行内存分析,并注意到在会话超时发生期间总是有几次长时间的 GC 暂停。此外,我们在应用程序中使用旧版本的 kafka api,当时默认的会话超时属性仅为 10 秒,现在他们在最近的版本中将其增加到 45 秒。

一旦我将 session.timeout.ms 属性更改为 45 秒,此错误就消失了。

© www.soinside.com 2019 - 2024. All rights reserved.