我正在尝试取消旋转 PySpark DataFrame,但没有得到正确的结果。
样本数据集:
# Prepare Data
data = [("Spain", 101, 201, 301), \
("Taiwan", 102, 202, 302), \
("Italy", 103, 203, 303), \
("China", 104, 204, 304)
]
# Create DataFrame
columns= ["Country", "2018", "2019", "2020"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)
以下是我尝试过的命令:
from pyspark.sql import functions as F
unpivotExpr = "stack(3, '2018', 2018, '2019', 2019, '2020', 2020) as (Year, CPI)"
unPivotDF = df.select("Country", F.expr(unpivotExpr))
unPivotDF.show()
结果:
如上图所示,“CPI”列的值与“Year”列的值相同,这不是预期的。预期结果如下:
“CPI”列的值是从相应国家/地区的数据透视表的每一行获取的。
有办法解决这个问题吗?
更新
您的“堆栈”表达式是正确的 - 只是要将数字用作列名称(2018、2019 等),请将它们括在反引号中:
unpivotExpr = "stack(3, '2018', `2018`, '2019', `2019`, '2020', `2020`) as (Year, CPI)"
替代解决方案
我无法用“stack”解决它;但还有另一种方法可以创建以列名作为键、以列的值作为值的映射,然后分解映射:
import pyspark.sql.functions as F
df = df.withColumn("year_cpi_map", F.create_map( \
F.lit("2018"), F.col("2018"), \
F.lit("2019"), F.col("2019"), \
F.lit("2020"), F.col("2020") \
)) \
.select("Country", F.explode("year_cpi_map").alias("Year", "CPI"))
或者概括一下:
import pyspark.sql.functions as F
import itertools
df = df.withColumn("year_cpi_map", F.create_map(list(itertools.chain(*[(F.lit(c), F.col(c)) for c in df.columns if c != "Country"])))) \
.select("Country", F.explode("year_cpi_map").alias("Year", "CPI"))
输出:
+-------+----+---+
|Country|Year|CPI|
+-------+----+---+
|Spain |2018|101|
|Spain |2019|201|
|Spain |2020|301|
|Taiwan |2018|102|
|Taiwan |2019|202|
|Taiwan |2020|302|
|Italy |2018|103|
|Italy |2019|203|
|Italy |2020|303|
|China |2018|104|
|China |2019|204|
|China |2020|304|
+-------+----+---+