Kafka Streams - Kafka 1.0.0 中的重新平衡异常

问题描述 投票:0回答:2

在 Kafka Streams 1.0.0 中,我们看到一个奇怪的错误。我的流应用程序摄取一个 kafka 主题并在不同的状态存储上发出多个聚合,现在该应用程序在 2 个节点的集群上运行,但添加第三个节点时,流应用程序在所有节点上崩溃。

以下是我得到的异常。

2017-12-12 15:32:55 ERROR Kafka010Base:47 - Exception caught in thread c-7-aq32-5648256f-9142-49e2-98c0-da792e6da48e-StreamThread-5
org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The server experienced an unexpected error when processing the request
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)

在进一步查看经纪人时,我看到了另一个例外

[2017-12-12 17:28:36,822] INFO [GroupCoordinator 90]: Preparing to rebalance group c-7-aq32 with old generation 25 (__consumer_offsets-39) (kafka.coordinator.group.GroupCoordinator)
[2017-12-12 17:28:40,500] INFO [GroupCoordinator 90]: Stabilized group c-7-aq32 generation 26 (__consumer_offsets-39) (kafka.coordinator.group.GroupCoordinator)
[2017-12-12 17:28:42,290] INFO [GroupCoordinator 90]: Assignment received from leader for group c-7-aq32 for generation 26 **(kafka.coordinator.group.GroupCoordinator)
[2017-12-12 17:28:42,300] ERROR [GroupMetadataManager brokerId=90] Appending metadata message for group c-7-aq32 generation 26 failed due to org.apache.kafka.common.errors.RecordTooLargeException, returning UNKNOWN error code to the client (kafka.coordinator.group.GroupMetadataManager)**
[2017-12-12 17:28:42,301] INFO [GroupCoordinator 90]: Preparing to rebalance group c-7-aq32 with old generation 26 (__consumer_offsets-39) (kafka.coordinator.group.GroupCoordinator)
[2017-12-12 17:28:52,301] INFO [GroupCoordinator 90]: Member c-7-aq32-6138ec53-4aff-4596-8f4b-44ae6f5d72da-StreamThread-13-consumer-e0cc0931-0619-4908-82c2-28f7bf9bace9 in group c-7-aq32 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

现在,我需要帮助来理解为什么我会收到 RecordTooLargeException。

apache-kafka apache-kafka-streams
2个回答
2
投票

更改经纪人的以下属性解决了问题

message.max.bytes: 5,242,880 (5mb)

该属性的默认值为 1mb。当消费者组中的消费者进行重新平衡时,每个消费者都会尝试将已分配和撤销的分区的元数据放回kafka。如果此元数据大小大于 1mb,那么这将导致代理出现“RecordTooLargeException”错误。

虽然问题很老,但我最近也遇到了类似的问题。


0
投票

我也面临着同样的问题。有没有办法确定元数据的大小?我们如何确定 5 mb 而不是默认的 1 mb 就足够了?

© www.soinside.com 2019 - 2024. All rights reserved.