使 JSON 格式一致 - Pyspark

问题描述 投票:0回答:1

两个不同格式的 json,转换为单一一致的格式并读入数据框。


>>> df.printSchema()
root
 |-- ReplicateRequest: struct (nullable = true)
 |    |-- MappingReplicateRequestMessage: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- MGroup: struct (nullable = true)
 |    |    |    |    |-- Object: array (nullable = true)
 |    |    |    |    |    |    |-- Code: string (nullable = true)


df1.printSchema()
root
 |-- ReplicateRequest: struct (nullable = true)
 |    |-- MappingReplicateRequestMessage: struct (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- MGroup: struct (nullable = true)
 |    |    |    |    |-- Object: array (nullable = true)
 |    |    |    |    |    |    |-- Code: string (nullable = true)




如果我想访问 Object.code 列值:

  1. 在第一个数据框中,我必须在 MappingReplicateRequestMessage 上使用 explode 来向下钻取它。

df.select("ReplicateRequest.*").withColumn("expl",explode((col("MappingReplicateRequestMessage")))).select("expl.*").select("MGroup.Object")

  1. 在第二个数据框中,我可以直接访问而不会爆炸。

df1.select("ReplicateRequest.MappingReplicateRequestMessage.MGroup.*")

在解析之前,我如何使它从转换为数组到结构或结构到数组保持一致和通用

json pyspark explode
1个回答
0
投票

您不能使用单个

DataFrame
调用将两个具有不同架构的文件读取到一个
spark.read
中。

您将不得不在两个不同的

DataFrame
中读取它们,操作每个
DataFrame
以创建具有您想要的通用模式的新DataFrame,然后将它们合并。

df1 = spark.read.csv/parquet/json()
df1 = df1.withColumn('new_json', <logic to convert>)

df2 = spark.read.csv/parquet/json()
df2 = df2.withColumn('new_json', <logic to convert>)

final_df = df1.union(df2)

也可以将输入读取为字符串

root
 |-- ReplicateRequest: string (nullable = true)

然后应用可以处理两种不同格式的 udf,提取

Object.code
并返回它,以便您最终得到一个具有统一模式的新列。需要可重现的例子。


将示例数据添加到您的示例中并使其成为可重现的示例,如下所示:

jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'

df = spark.createDataFrame([(jstr1,),(jstr2,),(jstr3,)], schema=['col1'])
df.show(truncate=False)

印花:

+----------------------------------------------------------------------------------------------------------------------------------------------------+
|col1                                                                                                                                                |
+----------------------------------------------------------------------------------------------------------------------------------------------------+
|{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}      |
|{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}} |
|{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}|
+----------------------------------------------------------------------------------------------------------------------------------------------------+

© www.soinside.com 2019 - 2024. All rights reserved.