如何修复:
Group authorization failed
尝试使用 python 客户端使用来自 Kafka 的消息时出现错误?
相同的设置在 Kafka CLI 中工作正常。
通常这个错误应该指向无效的权限,但是,当 CLI 工作时,我怀疑它一定是别的东西。
我的 kafka 实例启用了 SASL/SSL。使用以下设置:
src_schema_registry_conf = {
'url': 'http://<<host>>:8081'
}
src_schema_registry_client = SchemaRegistryClient(src_schema_registry_conf)
string_deserializer = StringDeserializer('utf_8')
avro_deserializer = AvroDeserializer(src_schema_registry_client)#,
src_conf = {
"bootstrap.servers": '<<host>>:9093',
"ssl.ca.location": 'certs/catrust.pem',
"security.protocol":"SASL_SSL",
"sasl.mechanism":"PLAIN",
"sasl.username":"<<username>",
"sasl.password":'<<secret_pasword',
'key.deserializer': string_deserializer,
'value.deserializer': avro_deserializer,
'group.id': "test-consumer-group",
'auto.offset.reset': "earliest"
}
消息的消费从以下位置开始:
consumer = DeserializingConsumer(src_conf)
consumer.subscribe(['<<topic>>'])
while True:
try:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = consumer.poll(1.0)
if msg is None:
continue
user = msg.value()
if user is not None:
print("record {}: value: {}\n"
.format(msg.key(), user))
except KeyboardInterrupt:
break
consumer.close()
我试图按照这里的例子进行操作:https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_consumer.py 但无法让它在我的环境中工作。
原来答案比想象中简单:
用户名必须写成
ALLCAPS
。然后它被接受。