我正在尝试使用sparklyr连接到安全的Kafka服务器。但是,要访问它,您需要指定正确的安全设置(协议,密码等)。但是,在read_options中指定它们时,它们不会传递到使用者配置。这是R代码:
library(sparklyr)
config <- spark_config()
config$sparklyr.shell.packages <- "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0"
sc <- spark_connect(master = "local",config=config, version="2.4.0")
read_options <- list(
kafka.bootstrap.servers='test.server',
group.id="name",
security.protocol='SSL',
ssl.key.password="password",
ssl.keystore.location="C:/Users/...",
ssl.keystore.password="password",
ssl.truststore.location="C:/Users/...",
ssl.truststore.password="password",
subscribe = "topic")
stream <- stream_read_kafka(sc, options = read_options)
如果我们查看spark的日志,则使用者配置中仅列出服务器:(简化版)
INFO ConsumerConfig: ConsumerConfig values:
bootstrap.servers = [test.server]
....
group.id = spark-kafka-source-7bb43fe7-56b2-4e19-9162-371e4db2075a-1047255113-driver-2
....
security.protocol = PLAINTEXT
...
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
..
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
是否有可能/解决方法向用户添加必要的设置?
更新
请参阅user1278798的回答
对于具有相同问题的人,重要的是要添加,并非火花支持所有设置(例如group.id或auto.offset.reset)。只需检查一下user1278798给出的链接即可。
正如the official documentation中明确解释的>>
Kafka自己的配置可以通过
DataStreamReader.option
带有kafka.
前缀]进行设置,例如stream.option("kafka.bootstrap.servers", "host:port")
。您的选项缺少前缀。