在使用Spark结构化流从Kafka中读取时,如何跳过ssl.truststore.password属性?

问题描述 投票:0回答:1

我正在尝试使用Spark结构化流技术从Kafka读取并写入Kudu Sink。下面是读写代码。

我正在使用Spark 2.2.0。

val kafkaDataFrame = spark
  .readStream
  .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
  .option("kafka.bootstrap.servers", kafkaReaderConfig.kafka_brokers)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism" , "GSSAPI")
  .option("kafka.ssl.truststore.location", kafkaReaderConfig.trust_jks_file_path)
  .option("kafka.sasl.jaas.config", jaas_config_str)
  .option("subscribe", kafkaReaderConfig.topics_set)
  .load()
  .selectExpr("CAST(value AS STRING) as value")

//After Transformation

  dfStrm.writeStream
    .option("checkpointLocation",path)
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .foreach(new KuduStreamWriter(tconfig))
    .outputMode("append")
    .start()
    .awaitTermination()
})  

但是得到以下例外:

20/05/07 10:59:00 INFO authenticator.AbstractLogin: Successfully logged in.
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT refresh thread started.
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT valid starting at: Thu May 07 10:58:17 UTC 2020
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT expires: Thu May 07 20:58:16 UTC 2020
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT refresh sleeping until: Thu May 07 19:25:22 UTC 2020
20/05/07 10:59:00 ERROR streaming.StreamExecution: Query [id = 6d08d948-6c28-4282-b108-eac99c62e253, runId = 94d599d9-b7a1-4cdc-937f-8d98390fb509] terminated with error
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
        at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
        at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
        at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
        at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at 
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        .
        .
        .
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
        at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: SSL trust store is specified, but trust store password is not specified.
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
        ... 370 more
Caused by: org.apache.kafka.common.KafkaException: SSL trust store is specified, but trust store password is not specified.
        at org.apache.kafka.common.security.ssl.SslFactory.createTruststore(SslFactory.java:195)
        at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:115)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:83)
        ... 373 more

Q.1从卡夫卡读取信息时,有什么方法可以跳过truststore.password属性吗? (我们没有密码,也不需要密码才能与Kafka一起使用)

是否有任何建议或解决方法?

scala apache-spark ssl apache-kafka spark-structured-streaming
1个回答
0
投票

首先,我建议您将版本更新为2.4.5

如果您不需要SSL,请不要进行设置。

val kafkaDataFrame = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaReaderConfig.kafka_brokers)
  .option("subscribe", kafkaReaderConfig.topics_set)
  .load()
  .selectExpr("CAST(value AS STRING) as value")
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.