Pyspark 结构数组的聚合

问题描述 投票:0回答:1

我在 pyspark 中有以下架构:

root
 |-- id: string (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- seconds: decimal(38,18) (nullable = true)
 |-- total_seconds: decimal(38,3) (nullable = true)

我有两个 pyspark 数据框,我需要将它们连接并聚合在一起。每个数据框都有相同的模式。

给定一个数据框中的以下输入:

[{
    "id": 1,
    "data": [{
        "id": "123",
        "name": "name1",
        "seconds": 50
    }, {
        "id": "234",
        "name": "name2",
        "seconds": 25
    }],
    "total_seconds": 100
}, {
    "id": 2,
    "data": [{
        "id": "123",
        "name": "name1",
        "seconds": 100
    }, {
        "id": "234",
        "name": "name2",
        "seconds": 200
    }],
    "total_seconds": 400
}]

在第二个数据框中,我有以下数据:

[{
    "id": 1,
    "data": [{
        "id": "123",
        "name": "name1",
        "seconds": 100
    }, {
        "id": "345",
        "name": "name3",
        "seconds": 25
    }],
    "total_seconds": 400
}, {
    "id": 3,
    "data": [{
        "id": "123",
        "name": "name1",
        "seconds": 50
    }, {
        "id": "234",
        "name": "name2",
        "seconds": 100
    }],
    "total_seconds": 200
}]

然后我会期待这个输出:

[{
    "id": 1,
    "data": [{
        "id": "123",
        "name": "name1",
        "seconds": 150
    }, {
        "id": "234",
        "name": "name2",
        "seconds": 25
    }, {
        "id": "345",
        "name": "name3",
        "seconds": 25
    }],
    "total_seconds": 500
}, {
    "id": 2,
    "data": [{
        "id": "123",
        "name": "name1",
        "seconds": 100
    }, {
        "id": "234",
        "name": "name2",
        "seconds": 200
    }],
    "total_seconds": 400
}, {
    "id": 3,
    "data": [{
        "id": "123",
        "name": "name1",
        "seconds": 50
    }, {
        "id": "234",
        "name": "name2",
        "seconds": 100
    }],
    "total_seconds": 200
}]

本质上,我需要做以下事情:

  1. 加入id列
  2. 聚合total_seconds
  3. 聚合/合并数据列
python apache-spark pyspark apache-spark-sql aggregate-functions
1个回答
0
投票

您可以通过连接 2 个数据框来解决这个问题,explode() 和 groupBy() 也有助于在连接前后操作数据,这是经过测试的代码,如果不清楚,可以在转换之间添加显示,或者在下面发表评论:

spark = SparkSession.builder.master("local[*]").getOrCreate()
df1 = spark.read.option("multiline", "true").json("json1.json")
df2 = spark.read.option("multiline", "true").json("json2.json") \
    .withColumnRenamed("data", "data_2").withColumnRenamed("id", "id_2").withColumnRenamed("total_seconds", "total_seconds_2")

df1_exploded = df1.withColumn("data", explode(col("data")))
df2_exploded = df2.withColumn("data_2", explode(col("data_2"))).drop("total_seconds_2")

resultDf = df1_exploded.join(df2_exploded, (df1_exploded.id == df2_exploded.id_2) & (
            df1_exploded.data.id == df2_exploded.data_2.id), "outer") \
    .withColumn("id", coalesce(col("id"), col("id_2"))) \
    .withColumn("data",
                struct(coalesce(col("data.id"), col("data_2.id")), coalesce(col("data.name"), col("data_2.name")),
                       coalesce(col("data.seconds"), lit(0)) + coalesce(col("data_2.seconds"), lit(0)))) \
    .select("data", "id", "total_seconds") \
    .groupby("id").agg(collect_list("data").alias("data"))

total_seconds_df = df1.join(df2, df1.id == df2.id_2, "outer")\
    .withColumn("id", coalesce(col("id"), col("id_2")))\
    .withColumn("total_seconds", coalesce(col("total_seconds"), lit(0)) + coalesce(col("total_seconds_2"), lit(0)))\
    .select("id", "total_seconds")
resultDf = resultDf.join(total_seconds_df, ["id"], "left")
resultDf.show()

结果:

+---+-------------------------------------------------------+-------------+
|id |data                                                   |total_seconds|
+---+-------------------------------------------------------+-------------+
|1  |[{123, name1, 150}, {234, name2, 25}, {345, name3, 25}]|500          |
|3  |[{123, name1, 50}, {234, name2, 100}]                  |200          |
|2  |[{123, name1, 100}, {234, name2, 200}]                 |400          |
+---+-------------------------------------------------------+-------------+
© www.soinside.com 2019 - 2024. All rights reserved.