如何解决在我pyspark代码这个reducebykey转型问题?

问题描述 投票:0回答:1

我有点坚持,如何得到这个值是正确的。下面是我的样本数据:

col_name,Category,SegmentID,total_cnt,PercentDistribution
city,ANTIOCH,1,1,15
city,ARROYO GRANDE,1,1,15
state,CA,1,3,15
state,NZ,1,4,15

enter image description here

我试图让输出数据帧为:

enter image description here

我可以到这为止。这里需要你的帮助。

    from pyspark.sql.types import StructType,StructField,StringType,IntegerType
    import json

    join_df=spark.read.csv("/tmp/testreduce.csv",inferSchema=True, header=True)
    jsonSchema = StructType([StructField("Name", StringType())
                           , StructField("Value", IntegerType())
                           , StructField("CatColName", StringType())
                           , StructField("CatColVal", StringType())
                        ])
    def reduceKeys(row1, row2):
            row1[0].update(row2[0])
            return row1

    res_df=join_df.rdd.map(lambda row: ("Segment " + str(row[2]), ({row[1]: row[3]},row[0],row[4])))\
.reduceByKey(lambda x, y: reduceKeys(x, y))\
.map(lambda row: (row[0], row[1][2],row[1][1], json.dumps(row[1][0]))).toDF(jsonSchema)

我当前的代码的输出:

它不是基于段ID和CatColName正确地对数据进行分组。

enter image description here

apache-spark pyspark apache-spark-sql pyspark-sql
1个回答
1
投票

问题是,reduceByKey需要你生成的字符串Segment 1考虑,这是相等的城市和国家。如果添加了col_name在它按预期工作的开始,但你得到你的结果不同的名称。这可以用正则表达式来改变

res_df=test_df.rdd.map(lambda row: ("Segment " + str(row[2]) +" " + str(row[0]), ({row[1]: row[3]},row[0],row[4])))\
.reduceByKey(lambda x, y: reduceKeys(x, y))\
.map(lambda row: (row[0], row[1][2],row[1][1], json.dumps(row[1][0]))).toDF(jsonSchema).withColumn("name",regexp_extract(col("name"),"(\w+\s\d+)",1))

res_df.show(truncate=False)

输出:

+---------+-----+----------+----------------------------------+
|name     |Value|CatColName|CatColVal                         |
+---------+-----+----------+----------------------------------+
|Segment 1|15   |city      |{"ANTIOCH": 1, "ARROYO GRANDE": 1}|
|Segment 1|15   |state     |{"CA": 3, "NZ": 4}                |
+---------+-----+----------+----------------------------------+

只需要最后REGEXP_EXTRACT恢复原来的名称。

© www.soinside.com 2019 - 2024. All rights reserved.