我在 s3 中有管道分隔的 csv 文件,我正在尝试将其加载到 Databricks 中。当使用下面的代码读取时,文件被正确读取:
df_test = spark.read.option("header", True).option("sep", "|").csv(s3_path)
display(df_test)
但是,当我使用结构化流时,它会将文件读取为具有 1 列(所有列名称连接在一起)
df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.allowOverwrites","true") \
.option("cloudFiles.format", "csv") \
.option("delimiter", "|") \
.option("cloudFiles.schemaLocation", checkpoint_directory) \
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.option("header","true") \
.option("ignoreLeadingWhiteSpace","false") \
.option("ignoreTrailingWhiteSpace","false") \
.option("escape","\"") \
.load(s3_path, sep='|') \
df.display()
出于安全目的隐藏列名称(但是所有数据都是完全假的),但希望您能意识到这里发生了什么。第一个列名称是所有旧列名称的组合 - “col1,col2,col3”。
我已在 ETL 过程中的许多其他表中使用了此逻辑,并且它运行得非常好。这让我认为 csv 文件中存在一些格式错误,但由于第一个代码片段读取得很好,所以我现在完全困惑了。
集群超时,现在再次启动集群后第二个代码片段运行正常。不知道这是否可能,所以我暂时保留这篇文章