消息未通过 sasl_plaintext 配置到达 kafka 消费者

问题描述 投票:0回答:1

我正在尝试在 kafka 中设置 SASL_PLAINTEXT 配置。 我浏览了文档并完成了必要的配置,如下所述:-

重复

config/server.properties
作为 config/server_sasl_plain.properties:-

server.properties
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
auto.create.topics.enable=true
listeners=SASL_PLAINTEXT://localhost:9094
advertised.listeners=SASL_PLAINTEXT://localhost:9094

zookeeper.properties

中添加了以下几行
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000 

然后将

consumer.properties 
复制到
consumer_sasl_plain.properties
并添加以下内容:-

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

类似地,创建了

producer_sasl_plain.properties
并添加了以下内容:-

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

创建了

config/zookeeper_jaas.conf
,内容如下:-

Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
   user_super="zookeeper"
   user_admin="admin-secret";
};

创建了

kafka_server_jaas.conf
,内容如下:-

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};

创建

kafka_client_jaas.conf
,包含以下内容:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
Client {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  username="admin"
  password="admin-secret";
};
 

已开始

zookeeper
如下:-

export KAFKA_OPTS="Djava.security.auth.login.config=/KAFKA_HOME/config/zookeeper_jaas.conf"
$ bin/zookeeper-server-start.sh config/zookeeper.properties

启动经纪商如下:-

$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_server_jaas.conf"
$ bin/kafka-server-start.sh -daemon config/server_sasl_plain.properties

启动消费者如下:-

$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_client_jaas.conf"
$ ./bin/kafka-console-consumer.sh --
topic test-topic --from-beginning --
consumer.config=config/consumer_sasl_plain.properties --bootstrap-server=localhost:9094

开始产生如下:-

$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_client_jaas.conf"
$ ./bin/kafka-console-producer.sh --broker-list localhost:9094 --topic test-topic
--producer.config=config/producer_sasl_plain.properties

我能够从生产者那里生成消息:- 但在消费者身上看不到任何东西

apache-kafka kafka-consumer-api
1个回答
0
投票

我不确定,但似乎消息实际上是从生产者发布到 kafka 的。 为了验证这一点,我检查了该主题的服务器日志目录,我可以通过跟踪整个日志目录来看到来自那里的消息:)。

为了进一步验证它,我创建了一个小scala程序来读取下面主题..示例中的kafka消息:-

  val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9094")  
    .option("kafka.security.protocol", "SASL_PLAINTEXT") 
    .option("kafka.sasl.mechanism", "PLAIN")              
    .option("kafka.sasl.jaas.config", """
                                        |org.apache.kafka.common.security.plain.PlainLoginModule required
                                        |username="admin"
                                        |password="admin-secret";
                                        |""".stripMargin)
    .option("subscribe", "test-topic")                     // Topic to subscribe to
    .option("startingOffsets", "earliest")                 // Starting offset
    .load()
    df.selectExpr("CAST(value AS STRING)").as(Encoders.STRING).show()
© www.soinside.com 2019 - 2024. All rights reserved.