Spark 结构化流无法正确加载 csv 文件

问题描述 投票:0回答:1

我在 s3 中有管道分隔的 csv 文件,我正在尝试将其加载到 Databricks 中。当使用下面的代码读取时,文件被正确读取:

df_test = spark.read.option("header", True).option("sep", "|").csv(s3_path)
display(df_test)

但是,当我使用结构化流时,它会将文件读取为具有 1 列(所有列名称连接在一起)

df = spark.readStream \
            .format("cloudFiles") \
            .option("cloudFiles.allowOverwrites","true") \
            .option("cloudFiles.format", "csv") \
            .option("delimiter", "|") \
            .option("cloudFiles.schemaLocation", checkpoint_directory) \
            .option("cloudFiles.schemaEvolutionMode", "rescue") \
            .option("header","true") \
            .option("ignoreLeadingWhiteSpace","false") \
            .option("ignoreTrailingWhiteSpace","false") \
            .option("escape","\"") \
            .load(s3_path, sep='|') \
df.display()

出于安全目的隐藏列名称(但是所有数据都是完全假的),但希望您能意识到这里发生了什么。第一个列名称是所有旧列名称的组合 - “col1,col2,col3”。

我已在 ETL 过程中的许多其他表中使用了此逻辑,并且它运行得非常好。这让我认为 csv 文件中存在一些格式错误,但由于第一个代码片段读取得很好,所以我现在完全困惑了。

python apache-spark pyspark databricks spark-structured-streaming
1个回答
0
投票

集群超时,现在再次启动集群后第二个代码片段运行正常。不知道这是否可能,所以我暂时保留这篇文章

© www.soinside.com 2019 - 2024. All rights reserved.