我该如何进行转换?

问题描述 投票:0回答:1

env:spark2.4.5

source.json:

{
    "a_key": "1",
    "a_pro": "2",
    "a_con": "3",
    "b_key": "4",
    "b_pro": "5",
    "b_con": "6",
    "c_key": "7",
    "c_pro": "8",
    "c_con": "9",
    ...
}

traget.json:

{
    "factors": [
        {
            "name": "a",
            "key": "1",
            "pros": "2",
            "cons": "3"
        },
        {
            "name": "b",
            "key": "4",
            "pros": "5",
            "cons": "6"
        },
        {
            "name": "c",
            "key": "7",
            "pros": "8",
            "cons": "9"
        },
        ...
    ]
}

您可以看到目标'名称'是源代码的一部分。例如,“ a”是“ a_key”,“ a_pro”,“ a_con”的“名称”。我真的不知道如何从键中提取值,并进行一些“分组”转换。有人可以给我一些建议吗?

apache-spark pyspark apache-spark-sql spark-streaming
1个回答
0
投票

IIUC首先从输入的json创建数据框

json_data = {
    "a_key": "1",
    "a_pro": "2",
    "a_con": "3",
    "b_key": "4",
    "b_pro": "5",
    "b_con": "6",
    "c_key": "7",
    "c_pro": "8",
    "c_con": "9"
}
df=spark.createDataFrame(list(map(list,json_data.items())),['key','value'])
df.show()

+-----+-----+
|  key|value|
+-----+-----+
|a_key|    1|
|a_pro|    2|
|a_con|    3|
|b_key|    4|
|b_pro|    5|
|b_con|    6|
|c_key|    7|
|c_pro|    8|
|c_con|    9|
+-----+-----+

现在从现有列中创建所需的列

import pyspark.sql.functions as  f
df2 = df.withColumn('Name', f.substring('key',1,1)).\
         withColumn('Attributes', f.concat(f.split('key','_')[1],f.lit('s')))
df2.show()
+-----+-----+----+----------+
|  key|value|Name|Attributes|
+-----+-----+----+----------+
|a_key|    1|   a|      keys|
|a_pro|    2|   a|      pros|
|a_con|    3|   a|      cons|
|b_key|    4|   b|      keys|
|b_pro|    5|   b|      pros|
|b_con|    6|   b|      cons|
|c_key|    7|   c|      keys|
|c_pro|    8|   c|      pros|
|c_con|    9|   c|      cons|
+-----+-----+----+----------+

现在旋转数据框并将结果收集为json对象

output_json = df2.groupBy('Name').\
                  pivot('Attributes').\
                  agg(f.min('value')).\         
                  select(f.collect_list(f.struct('Name','keys','cons','pros')).alias('factors')).\
                  toJSON().collect()

import json
print(json.dumps(json.loads(output_json[0]),indent=4))

{
    "factors": [
        {
            "Name": "c",
            "keys": "7",
            "cons": "9",
            "pros": "8"
        },
        {
            "Name": "b",
            "keys": "4",
            "cons": "6",
            "pros": "5"
        },
        {
            "Name": "a",
            "keys": "1",
            "cons": "3",
            "pros": "2"
        }
    ]
}

© www.soinside.com 2019 - 2024. All rights reserved.