Pyspark:如何使用不同的列连接两个具有不同条件的不同数据集?

问题描述 投票:0回答:2

我将把这两个数据集与不同列的不同条件连接起来,以获得 Pyspark 中的一个数据集

第一个数据集df1:

rc1 rc2 rc3 回应
AB2 AB1 AB6
AB4 AB3 AB7 谢因
AB9 AB5 AB8 帕特里克

第二个数据集df2:

钥匙 描述
AB1 正常
AB4 展开
AB3
AB6
AB8 第一
AB2 码头
AB7 失踪了
AB9 套餐
AB5 错了

我将在最终数据集中获得 df 与 df1 和 df2 的连接:

rc1 rc2 rc3 回应
码头 正常
展开 失踪了 谢因
套餐 错了 第一 帕特里克
dataframe pyspark
2个回答
1
投票

参见下面的实现 -

df = (
       df1.join(df2, df1.rc1 == df2.Key, 'inner').drop("Key","rc1") 
          .withColumnRenamed('description', 'rc1') 
          .join(df2, df1.rc2 == df2.Key, 'inner').drop("Key","rc2") 
          .withColumnRenamed('description', 'rc2') 
          .join(df2, df1.rc3 == df2.Key, 'inner').drop("Key","rc3") 
          .withColumnRenamed('description', 'rc3') 
          .select("rc1","rc2","rc3","resp")
     )

df.show()

+-------+------+-------+-------+
|    rc1|   rc2|    rc3|   resp|
+-------+------+-------+-------+
|   Dock|Normal|    Big|   jean|
|Package| Wrong|  First|patrick|
| Expand| Small|Missing|  shein|
+-------+------+-------+-------+

0
投票

代码

定义要替换值的列的列表。创建一个堆栈表达式并堆栈数据帧。然后

join
df2
替换基于公共
Key
的值。最后,
pivot
数据框将其重塑回来

cols = ['rc1', 'rc2', 'rc3']
expr = f"stack({len(cols)}, %s) as (rc, Key)" % ', '.join(f"'{c}', {c}" for c in cols)
result = (
    df1.selectExpr('resp', expr)
    .join(df2, on='key', how='left')
    .drop('Key')
    .groupBy('resp')
    .pivot('rc')
    .agg(F.first('description'))
)

结果

+-------+-------+------+-------+
|   resp|    rc1|   rc2|    rc3|
+-------+-------+------+-------+
|   jean|   Dock|Normal|    Big|
|patrick|Package| Wrong|  First|
|  shein| Expand| small|Missing|
+-------+-------+------+-------+
© www.soinside.com 2019 - 2024. All rights reserved.