您可以使用以下选项定义 Kafka 连接:
options = {
"kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";',
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"subscribe": topic,
}
df = spark.readStream.format("kafka").options(**options).load()
此代码使用带有纯文本凭据的 SASL_SSL 安全协议设置连接到 Kafka 代理所需的配置。 然后,它将流数据从指定的 Kafka 主题读取到 DataFrame df 中。
在生产中,建议将 JAAS 配置存储在名为 jaas.conf 的文件中,并从 Spark 应用程序中删除
kafka.sasl.jaas.config
选项。
相反,您应该使用 --driver-java-options 标志将文件路径传递给 Spark-submit 命令。
options = {
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"subscribe": topic,
}
df = spark.readStream.format("kafka").options(**options).load()
对于 Spark-submit,使用 --driver-java-options 提供 jaas.conf 的文件路径:
spark-submit \
--driver-java-options -Djava.security.auth.login.config=/path/to/jaas.conf \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 yourapp.py
确保根据您的应用程序的要求调整文件路径和包版本。
参考: SO 链接