我将把这两个数据集在不同的列上以不同的条件连接起来,以获得 Pyspark 中的一个数据集。
第一个数据集df1:
rc1 | rc2 | rc3 | 回应 |
---|---|---|---|
AB2 | AB1 | AB6 | 让 |
AB4 | AB3 | AB7 | 谢因 |
AB9 | AB5 | AB8 | 帕特里克 |
第二个数据集df2:
钥匙 | 描述 |
---|---|
AB1 | 正常 |
AB4 | 展开 |
AB3 | 小 |
AB6 | 大 |
AB8 | 第一 |
AB2 | 码头 |
AB7 | 失踪了 |
AB9 | 套餐 |
AB5 | 错了 |
我将在最终数据集中获得 df 与 df1 和 df2 的连接:
rc1 | rc2 | rc3 | 回应 |
---|---|---|---|
码头 | 正常 | 大 | 让 |
展开 | 小 | 失踪了 | 谢因 |
套餐 | 错了 | 第一 | 帕特里克 |
参见下面的实现 -
df = (
df1.join(df2, df1.rc1 == df2.Key, 'inner').drop("Key","rc1")
.withColumnRenamed('description', 'rc1')
.join(df2, df1.rc2 == df2.Key, 'inner').drop("Key","rc2")
.withColumnRenamed('description', 'rc2')
.join(df2, df1.rc3 == df2.Key, 'inner').drop("Key","rc3")
.withColumnRenamed('description', 'rc3')
.select("rc1","rc2","rc3","resp")
)
df.show()
+-------+------+-------+-------+
| rc1| rc2| rc3| resp|
+-------+------+-------+-------+
| Dock|Normal| Big| jean|
|Package| Wrong| First|patrick|
| Expand| Small|Missing| shein|
+-------+------+-------+-------+
定义要替换值的列的列表。创建一个堆栈表达式并堆栈数据帧。然后
join
与 df2
替换基于公共 Key
的值。最后,pivot
数据框将其重塑回来
cols = ['rc1', 'rc2', 'rc3']
expr = f"stack({len(cols)}, %s) as (rc, Key)" % ', '.join(f"'{c}', {c}" for c in cols)
result = (
df1.selectExpr('resp', expr)
.join(df2, on='key', how='left')
.drop('Key')
.groupBy('resp')
.pivot('rc')
.agg(F.first('description'))
)
+-------+-------+------+-------+
| resp| rc1| rc2| rc3|
+-------+-------+------+-------+
| jean| Dock|Normal| Big|
|patrick|Package| Wrong| First|
| shein| Expand| small|Missing|
+-------+-------+------+-------+