使用 SASL_PLAINTEXT 在 KRaft 模式下运行 Kafka

问题描述 投票:0回答:1

您好,我正在尝试在 KRaft 模式下运行 Kafka,并启用 SASL_PLAINTEXT 并收到错误。

我的配置看起来像这样

process.roles=broker,controller
node.id=2
controller.quorum.voters=1@kraft1:9093,2@kraft2:9093,3@kraft3:9093,4@kraft4:9093
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=SASL_PLAINTEXT://kraft2:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kraft-logs
num.partitions=16
default.replication.factor=4
min.insync.replicas=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.controller.protocol=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer

我的 kafka_jaas.conf 看起来像这样:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    serviceName="kafka"
    username="admin"
    password="admin"
    user_admin="admin"
    user_alice="alice"
    user_bob="bob"
    user_charlie="charlie";
};

我明白了。我不知道接下来我应该做什么:

 java.lang.NullPointerException: Cannot invoke "String.indexOf(String)" because "connectString" is null
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:2250)
        at kafka.security.authorizer.AclAuthorizer.configure(AclAuthorizer.scala:211)
        at kafka.server.ControllerServer.$anonfun$startup$11(ControllerServer.scala:154)
        at kafka.server.ControllerServer.$anonfun$startup$11$adapted(ControllerServer.scala:154)
        at scala.Option.foreach(Option.scala:407)
        at kafka.server.ControllerServer.startup(ControllerServer.scala:154)
        at kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:95)
        at kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:95)
        at scala.Option.foreach(Option.scala:407)
        at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:95)
        at kafka.Kafka$.main(Kafka.scala:113)
        at kafka.Kafka.main(Kafka.scala)

我做错了什么?

我试图找到解决方案,但我陷入困境。

apache-kafka sasl kraft
1个回答
0
投票

内置

kafka.security.authorizer.AclAuthorizer
授权者需要ZooKeeper。

在KRaft模式下,您可以使用的内置授权者是

org.apache.kafka.metadata.authorizer.StandardAuthorizer

有关此授权者的详细信息,请参阅KIP-801

© www.soinside.com 2019 - 2024. All rights reserved.