我在 pyspark 中有以下架构:
root
|-- id: string (nullable = true)
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- seconds: decimal(38,18) (nullable = true)
|-- total_seconds: decimal(38,3) (nullable = true)
我有两个 pyspark 数据框,我需要将它们连接并聚合在一起。每个数据框都有相同的模式。
给定一个数据框中的以下输入:
[{
"id": 1,
"data": [{
"id": "123",
"name": "name1",
"seconds": 50
}, {
"id": "234",
"name": "name2",
"seconds": 25
}],
"total_seconds": 100
}, {
"id": 2,
"data": [{
"id": "123",
"name": "name1",
"seconds": 100
}, {
"id": "234",
"name": "name2",
"seconds": 200
}],
"total_seconds": 400
}]
在第二个数据框中,我有以下数据:
[{
"id": 1,
"data": [{
"id": "123",
"name": "name1",
"seconds": 100
}, {
"id": "345",
"name": "name3",
"seconds": 25
}],
"total_seconds": 400
}, {
"id": 3,
"data": [{
"id": "123",
"name": "name1",
"seconds": 50
}, {
"id": "234",
"name": "name2",
"seconds": 100
}],
"total_seconds": 200
}]
然后我会期待这个输出:
[{
"id": 1,
"data": [{
"id": "123",
"name": "name1",
"seconds": 150
}, {
"id": "234",
"name": "name2",
"seconds": 25
}, {
"id": "345",
"name": "name3",
"seconds": 25
}],
"total_seconds": 500
}, {
"id": 2,
"data": [{
"id": "123",
"name": "name1",
"seconds": 100
}, {
"id": "234",
"name": "name2",
"seconds": 200
}],
"total_seconds": 400
}, {
"id": 3,
"data": [{
"id": "123",
"name": "name1",
"seconds": 50
}, {
"id": "234",
"name": "name2",
"seconds": 100
}],
"total_seconds": 200
}]
本质上,我需要做以下事情:
您可以通过连接 2 个数据框来解决这个问题,explode() 和 groupBy() 也有助于在连接前后操作数据,这是经过测试的代码,如果不清楚,可以在转换之间添加显示,或者在下面发表评论:
spark = SparkSession.builder.master("local[*]").getOrCreate()
df1 = spark.read.option("multiline", "true").json("json1.json")
df2 = spark.read.option("multiline", "true").json("json2.json") \
.withColumnRenamed("data", "data_2").withColumnRenamed("id", "id_2").withColumnRenamed("total_seconds", "total_seconds_2")
df1_exploded = df1.withColumn("data", explode(col("data")))
df2_exploded = df2.withColumn("data_2", explode(col("data_2"))).drop("total_seconds_2")
resultDf = df1_exploded.join(df2_exploded, (df1_exploded.id == df2_exploded.id_2) & (
df1_exploded.data.id == df2_exploded.data_2.id), "outer") \
.withColumn("id", coalesce(col("id"), col("id_2"))) \
.withColumn("data",
struct(coalesce(col("data.id"), col("data_2.id")), coalesce(col("data.name"), col("data_2.name")),
coalesce(col("data.seconds"), lit(0)) + coalesce(col("data_2.seconds"), lit(0)))) \
.select("data", "id", "total_seconds") \
.groupby("id").agg(collect_list("data").alias("data"))
total_seconds_df = df1.join(df2, df1.id == df2.id_2, "outer")\
.withColumn("id", coalesce(col("id"), col("id_2")))\
.withColumn("total_seconds", coalesce(col("total_seconds"), lit(0)) + coalesce(col("total_seconds_2"), lit(0)))\
.select("id", "total_seconds")
resultDf = resultDf.join(total_seconds_df, ["id"], "left")
resultDf.show()
结果:
+---+-------------------------------------------------------+-------------+
|id |data |total_seconds|
+---+-------------------------------------------------------+-------------+
|1 |[{123, name1, 150}, {234, name2, 25}, {345, name3, 25}]|500 |
|3 |[{123, name1, 50}, {234, name2, 100}] |200 |
|2 |[{123, name1, 100}, {234, name2, 200}] |400 |
+---+-------------------------------------------------------+-------------+