我有嵌套的 JSON 文件,我需要将每个文件放入数据框的一个单元格中。
最初的想法是采用嵌套的 json,再创建一个名为“DataType”的键值的列,将整个 json 文件放在第二列中并将其写入按该数据类型分区的 S3 存储桶中,编写代码如下所示:
def write_data(data_df, output_path):
data_df.coalesce(100).write.partitionBy("DataType").mode("append").parquet(output_path)
基本上就是整理胶水工作。
我试过这个:
df = dyf.toDF()
df2 = df.withColumn("data", lit(df.toJSON().first()))
它看起来很好,直到我处理多个 JSON 文件,在输出中,由于这个 first(),在输出中每一行都有相同的 json。
在此处添加工作解决方案:
df2 = df.withColumn("data", to_json(struct([df[col] for col in df.columns])))