我正在尝试在 kafka 中设置 SASL_PLAINTEXT 配置。 我浏览了文档并完成了必要的配置,如下所述:-
重复
config/server.properties
作为 config/server_sasl_plain.properties:-
server.properties
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
auto.create.topics.enable=true
listeners=SASL_PLAINTEXT://localhost:9094
advertised.listeners=SASL_PLAINTEXT://localhost:9094
在
zookeeper.properties
中添加了以下几行
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
然后将
consumer.properties
复制到consumer_sasl_plain.properties
并添加以下内容:-
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
类似地,创建了
producer_sasl_plain.properties
并添加了以下内容:-
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
创建了
config/zookeeper_jaas.conf
,内容如下:-
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_super="zookeeper"
user_admin="admin-secret";
};
创建了
kafka_server_jaas.conf
,内容如下:-
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};
创建
kafka_client_jaas.conf
,包含以下内容:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};
已开始
zookeeper
如下:-
export KAFKA_OPTS="Djava.security.auth.login.config=/KAFKA_HOME/config/zookeeper_jaas.conf"
$ bin/zookeeper-server-start.sh config/zookeeper.properties
启动经纪商如下:-
$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_server_jaas.conf"
$ bin/kafka-server-start.sh -daemon config/server_sasl_plain.properties
启动消费者如下:-
$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_client_jaas.conf"
$ ./bin/kafka-console-consumer.sh --
topic test-topic --from-beginning --
consumer.config=config/consumer_sasl_plain.properties --bootstrap-server=localhost:9094
开始产生如下:-
$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_client_jaas.conf"
$ ./bin/kafka-console-producer.sh --broker-list localhost:9094 --topic test-topic
--producer.config=config/producer_sasl_plain.properties
我不确定,但似乎消息实际上是从生产者发布到 kafka 的。 为了验证这一点,我检查了该主题的服务器日志目录,我可以通过跟踪整个日志目录来看到来自那里的消息:)。
为了进一步验证它,我创建了一个小scala程序来读取下面主题..示例中的kafka消息:-
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9094")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config", """
|org.apache.kafka.common.security.plain.PlainLoginModule required
|username="admin"
|password="admin-secret";
|""".stripMargin)
.option("subscribe", "test-topic") // Topic to subscribe to
.option("startingOffsets", "earliest") // Starting offset
.load()
df.selectExpr("CAST(value AS STRING)").as(Encoders.STRING).show()