我正在尝试从kafka读取数据并将其保存到hdfs的实木复合地板文件中。我的代码类似于以下内容,区别在于我使用Java编写。
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)").writeStream.format("parquet").option("path",outputPath).option("checkpointLocation", "/tmp/sparkcheckpoint1/").outputMode("append").start().awaiteTermination()
但是它抛出了Uri without authority: hdfs:/data/_spark_metadata
异常,其中hdfs:///data
是输出路径。
当我将代码更改为spark.read
和df.write
一次写入镶木地板文件时,没有任何异常,因此我想它与我的hdfs配置无关。
有人可以帮我吗?
Here有建议从路径中删除hdfs://,但将其保留在检查点中。但是,对我来说,在HDP沙箱中,无论在检查点有无hdfs://,它都能同时工作:
.option("path", "/user/username/outpath")
.option("checkpointLocation", "/tmp/checkpoint")