我正在尝试使用Spark结构化流技术从Kafka读取并写入Kudu Sink。下面是读写代码。
我正在使用Spark 2.2.0。
val kafkaDataFrame = spark
.readStream
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("kafka.bootstrap.servers", kafkaReaderConfig.kafka_brokers)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism" , "GSSAPI")
.option("kafka.ssl.truststore.location", kafkaReaderConfig.trust_jks_file_path)
.option("kafka.sasl.jaas.config", jaas_config_str)
.option("subscribe", kafkaReaderConfig.topics_set)
.load()
.selectExpr("CAST(value AS STRING) as value")
//After Transformation
dfStrm.writeStream
.option("checkpointLocation",path)
.trigger(Trigger.ProcessingTime("10 seconds"))
.foreach(new KuduStreamWriter(tconfig))
.outputMode("append")
.start()
.awaitTermination()
})
但是得到以下例外:
20/05/07 10:59:00 INFO authenticator.AbstractLogin: Successfully logged in.
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT refresh thread started.
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT valid starting at: Thu May 07 10:58:17 UTC 2020
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT expires: Thu May 07 20:58:16 UTC 2020
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT refresh sleeping until: Thu May 07 19:25:22 UTC 2020
20/05/07 10:59:00 ERROR streaming.StreamExecution: Query [id = 6d08d948-6c28-4282-b108-eac99c62e253, runId = 94d599d9-b7a1-4cdc-937f-8d98390fb509] terminated with error
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
.
.
.
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: SSL trust store is specified, but trust store password is not specified.
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
... 370 more
Caused by: org.apache.kafka.common.KafkaException: SSL trust store is specified, but trust store password is not specified.
at org.apache.kafka.common.security.ssl.SslFactory.createTruststore(SslFactory.java:195)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:115)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:83)
... 373 more
Q.1从卡夫卡读取信息时,有什么方法可以跳过truststore.password属性吗? (我们没有密码,也不需要密码才能与Kafka一起使用)
是否有任何建议或解决方法?
首先,我建议您将版本更新为2.4.5
如果您不需要SSL,请不要进行设置。
val kafkaDataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaReaderConfig.kafka_brokers)
.option("subscribe", kafkaReaderConfig.topics_set)
.load()
.selectExpr("CAST(value AS STRING) as value")