我有两个数据框,DF1 和 DF2。 DF1 是主设备,DF2 是增量设备。 DF2 中的数据应插入到 DF1 中或用于更新 DF1 数据。
假设 DF1 具有以下格式:
id_号 | 开始日期 | 金额 | 天 |
---|---|---|---|
1 | 2016-01-01 | 4650 | 22 |
2 | 2016-01-02 | 3130 | 45 |
1 | 2016-01-03 | 4456 | 22 |
2 | 2016-01-15 | 1234 | 45 |
DF2 包含以下内容:
id_号 | 开始日期 | 金额 | 天 |
---|---|---|---|
1 | 2016-01-01 | 8650 | 52 |
2 | 2016-01-02 | 7130 | 65 |
1 | 2016-01-06 | 3456 | 20 |
2 | 2016-01-20 | 2345 | 19 |
3 | 2016-02-02 | 1345 | 19 |
我需要合并两个数据帧,以便如果 DF2 的“id_no”和“开始日期”与 DF1 匹配,则应在 DF1 中替换它,如果不匹配,则应将其插入到 DF1 中。 “id_no”不是唯一的。
预期结果:
id_号 | 开始日期 | 金额 | 天 |
---|---|---|---|
1 | 2016-01-01 | 8650 | 52 |
2 | 2016-01-02 | 7130 | 65 |
1 | 2016-01-03 | 4456 | 22 |
2 | 2016-01-15 | 1234 | 45 |
1 | 2016-01-06 | 3456 | 20 |
2 | 2016-01-20 | 2345 | 19 |
3 | 2016-02-02 | 1345 | 19 |
您可以连接
id_no
和 start_date
上的两个数据框,然后 coalesce
amount
和 days
列,其中首先来自 df2
的列:
import pyspark.sql.functions as f
df1.alias('a').join(
df2.alias('b'), ['id_no', 'start_date'], how='outer'
).select('id_no', 'start_date',
f.coalesce('b.amount', 'a.amount').alias('amount'),
f.coalesce('b.days', 'a.days').alias('days')
).show()
+-----+----------+------+----+
|id_no|start_date|amount|days|
+-----+----------+------+----+
| 1|2016-01-06| 3456| 20|
| 2|2016-01-20| 2345| 19|
| 1|2016-01-03| 4456| 22|
| 3|2016-02-02| 1345| 19|
| 2|2016-01-15| 1234| 45|
| 1|2016-01-01| 8650| 52|
| 2|2016-01-02| 7130| 65|
+-----+----------+------+----+
如果您还有更多列:
cols = ['amount', 'days']
df1.alias('a').join(
df2.alias('b'), ['id_no', 'start_date'], how='outer'
).select('id_no', 'start_date',
*(f.coalesce('b.' + col, 'a.' + col).alias(col) for col in cols)
).show()
+-----+----------+------+----+
|id_no|start_date|amount|days|
+-----+----------+------+----+
| 1|2016-01-06| 3456| 20|
| 2|2016-01-20| 2345| 19|
| 1|2016-01-03| 4456| 22|
| 3|2016-02-02| 1345| 19|
| 2|2016-01-15| 1234| 45|
| 1|2016-01-01| 8650| 52|
| 2|2016-01-02| 7130| 65|
+-----+----------+------+----+
union
应该这样做。
from pyspark.sql import functions as F
grp_by = {'id_no', 'start_date'}
df = df2.union(df1)
df = df.groupby(*grp_by).agg(*[F.first(c).alias(c) for c in set(df.columns)-grp_by])
df.show()
# +-----+----------+----+------+
# |id_no|start_date|days|amount|
# +-----+----------+----+------+
# | 1|2016-01-06| 20| 3456|
# | 2|2016-01-20| 19| 2345|
# | 1|2016-01-03| 22| 4456|
# | 3|2016-02-02| 19| 1345|
# | 2|2016-01-15| 45| 1234|
# | 1|2016-01-01| 52| 8650|
# | 2|2016-01-02| 65| 7130|
# +-----+----------+----+------+
如果您使用 Databricks Delta Lake 表,则可以使用 SQL 的
MERGE INTO
:
将基于源表的一组更新、插入和删除合并到目标 Delta 表中。
此语句仅支持 Delta Lake 表。
您只需要创建一个
new_id
,它是 id_no
和 start_date
的连接。
MERGE INTO df1
USING df2
ON df1.new_id = df2.new_id
WHEN MATCHED THEN
UPDATE SET df1.amount = df2.amount, df1.days = df2.days
WHEN NOT MATCHED
THEN INSERT *