群组成员支持的协议与现有成员不兼容

问题描述 投票:0回答:4

我面临与 Kafka 相关的问题。

我当前的服务 (

Producer
) 将消息发送到 Kafka 主题 (
events
)。该服务使用
kafka_2.12 v1.0.0
,用 Java 编写。

我正在尝试将其与

spark-streaming
的示例项目集成为
Consumer
服务(此处使用kafka_2.11 v0.10.0,用Scala编写)

消息已成功从

Producer
发送到Kafka主题。但是,我总是收到以下错误堆栈:

Exception in thread "main" org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members.
    at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571)    at com.jj.streaming.ItemApp$.delayedEndpoint$com$jj$streaming$ItemApp$1(ItemApp.scala:72)
    at com.jj.streaming.ItemApp$delayedInit$body.apply(ItemApp.scala:12)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)     at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)   at scala.App$class.main(App.scala:76)
    at com.jj.streaming.ItemApp$.main(ItemApp.scala:12)
    at com.jj.streaming.ItemApp.main(ItemApp.scala)

我不知道根本原因。我该如何解决这个问题?

apache-spark apache-kafka spark-streaming
4个回答
5
投票

当我尝试将消费者添加到使用与之前不同的分区分配策略的集群时,在我的配置中会发生这种情况。

例如:

partition.assignment.strategy=org.apache.kafka.clients.consumer.RandomAccessAssignor

混合或默认为:

partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor

1
投票

正如 @john Cairns 和 @Iraj Hedyati 所指出的,检查分配给消费者组的分配策略。不同的客户端创建具有不同默认策略的消费者组。例如

当我使用kafka命令行客户端(java)时,它使用“范围”策略创建了一个消费者组。

/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group aeprocessor5 --state

GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
aeprocessor5              172.16.1.11:9092 (1003)   range                Stable          1

而当我使用

sarama
库使用 go 客户端创建消费者组时,它使用“循环策略”。

/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group aeprocessor5 --state

GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
aeprocessor5              172.16.1.11:9092 (1003)   roundrobin           Stable          1

因此,如果该组已经存在并且具有不同的策略,则会报告

InconsistentGroupProtocolException


0
投票

异常问题的根本原因

InconsistentGroupProtocolException
已经在另一个答案中描述过(我们在同一个消费者组中同时有不同的分区分配策略),但我想添加如何在不停机的情况下解决问题,如果我们需要有意更改分配策略(例如,为了使用新的合格策略减少 Kafka 重新平衡延迟),或者由于切换到另一个 Kafka 消费者库。

首先,我们需要弄清楚以前和新的所需分区分配策略是什么(在另一个答案中也提到了如何定义前一个),然后,我们应该在新的消费者组中明确指定这两种策略发布。 为此,需要指定属性

partition.assignment.strategy
或基于 Kafka 消费者库的类似属性(例如 Camel Kafka 的
partitionAssignor

partition.assignment.strategy=strategy1,strategy2

如果为一个消费者组指定了多个分区分配策略,则策略的优先级由它们列出的顺序决定。列出的第一个策略将具有最高优先级,其次是第二个策略,依此类推。在重新平衡期间,消费者组协调器尝试使用具有最高优先级的分区分配策略。如果该策略失败或遇到任何问题,协调器将退回到列表中的下一个策略。

就我而言,我需要从

org.apache.kafka.clients.consumer.RangeAssignor
切换到
org.apache.kafka.clients.consumer.CooperativeStickyAssignor
,所以我指定了以下内容

partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor

这样,我们就可以在没有任何停机时间的情况下进行平滑部署,并且在进一步的版本中,我们可以只保留单一策略。


-1
投票

对我来说,这是因为我不小心使用同一个消费者组订阅了2个不同的主题。

© www.soinside.com 2019 - 2024. All rights reserved.