我正试图使用SASL Plain将Kafka Java Client连接到Kafka broker。但是当我尝试从生产者发送消息时,Kafka服务器记录了以下错误。
[2020-04-30 14:48:14,955] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
看上去,生产者试图在SASL握手之前发送一个元数据请求。如何在发送消息之前进行握手?
以下是我的 kafka_server_jaas.conf
文件,它是用于Kafka服务器的。
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
以下是我的 zookeeper_jaas.conf
文件,它是用于zookeeper的。
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
在我的Java生产者中,我设置了以下属性。
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin_secret\"");
properties.put("sasl.mechanisms", "PLAIN");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer kafkaProducer = new KafkaProducer(properties);
我有什么地方做错了吗?
你需要指定 security.protocol
否则,默认情况下,Kafka客户端不使用 SASL
.
在你的客户端属性中,添加。
properties.put("security.protocol", "SASL_SSL");
还有... SASL_PLAINTEXT
但不建议使用 PLAIN
机制之上 SASL_PLAINTEXT
因为实际上密码将以明文交换。