您好,我正在尝试在 KRaft 模式下运行 Kafka,并启用 SASL_PLAINTEXT 并收到错误。
我的配置看起来像这样
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@kraft1:9093,2@kraft2:9093,3@kraft3:9093,4@kraft4:9093
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=SASL_PLAINTEXT://kraft2:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kraft-logs
num.partitions=16
default.replication.factor=4
min.insync.replicas=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.controller.protocol=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
我的 kafka_jaas.conf 看起来像这样:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="kafka"
username="admin"
password="admin"
user_admin="admin"
user_alice="alice"
user_bob="bob"
user_charlie="charlie";
};
我明白了。我不知道接下来我应该做什么:
java.lang.NullPointerException: Cannot invoke "String.indexOf(String)" because "connectString" is null
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:2250)
at kafka.security.authorizer.AclAuthorizer.configure(AclAuthorizer.scala:211)
at kafka.server.ControllerServer.$anonfun$startup$11(ControllerServer.scala:154)
at kafka.server.ControllerServer.$anonfun$startup$11$adapted(ControllerServer.scala:154)
at scala.Option.foreach(Option.scala:407)
at kafka.server.ControllerServer.startup(ControllerServer.scala:154)
at kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:95)
at kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:95)
at scala.Option.foreach(Option.scala:407)
at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:95)
at kafka.Kafka$.main(Kafka.scala:113)
at kafka.Kafka.main(Kafka.scala)
我做错了什么?
我试图找到解决方案,但我陷入困境。
内置
kafka.security.authorizer.AclAuthorizer
授权者需要ZooKeeper。
在KRaft模式下,您可以使用的内置授权者是
org.apache.kafka.metadata.authorizer.StandardAuthorizer
。
有关此授权者的详细信息,请参阅KIP-801。