PySpark:用另一个数据帧插入或更新数据帧

问题描述 投票:0回答:3

我有两个数据框,DF1 和 DF2。 DF1 是主设备,DF2 是增量设备。 DF2 中的数据应插入到 DF1 中或用于更新 DF1 数据。

假设 DF1 具有以下格式:

id_号 开始日期 金额
1 2016-01-01 4650 22
2 2016-01-02 3130 45
1 2016-01-03 4456 22
2 2016-01-15 1234 45

DF2 包含以下内容:

id_号 开始日期 金额
1 2016-01-01 8650 52
2 2016-01-02 7130 65
1 2016-01-06 3456 20
2 2016-01-20 2345 19
3 2016-02-02 1345 19

我需要合并两个数据帧,以便如果 DF2 的“id_no”和“开始日期”与 DF1 匹配,则应在 DF1 中替换它,如果不匹配,则应将其插入到 DF1 中。 “id_no”不是唯一的。

预期结果:

id_号 开始日期 金额
1 2016-01-01 8650 52
2 2016-01-02 7130 65
1 2016-01-03 4456 22
2 2016-01-15 1234 45
1 2016-01-06 3456 20
2 2016-01-20 2345 19
3 2016-02-02 1345 19
python dataframe pyspark apache-spark-sql upsert
3个回答
12
投票

您可以连接

id_no
start_date
上的两个数据框,然后
coalesce
amount
days
列,其中首先来自
df2
的列:

import pyspark.sql.functions as f

df1.alias('a').join(
    df2.alias('b'), ['id_no', 'start_date'], how='outer'
).select('id_no', 'start_date', 
    f.coalesce('b.amount', 'a.amount').alias('amount'), 
    f.coalesce('b.days', 'a.days').alias('days')
).show()

+-----+----------+------+----+
|id_no|start_date|amount|days|
+-----+----------+------+----+
|    1|2016-01-06|  3456|  20|
|    2|2016-01-20|  2345|  19|
|    1|2016-01-03|  4456|  22|
|    3|2016-02-02|  1345|  19|
|    2|2016-01-15|  1234|  45|
|    1|2016-01-01|  8650|  52|
|    2|2016-01-02|  7130|  65|
+-----+----------+------+----+

如果您还有更多列:

cols = ['amount', 'days']

df1.alias('a').join(
    df2.alias('b'), ['id_no', 'start_date'], how='outer'
).select('id_no', 'start_date', 
    *(f.coalesce('b.' + col, 'a.' + col).alias(col) for col in cols)
).show()
+-----+----------+------+----+
|id_no|start_date|amount|days|
+-----+----------+------+----+
|    1|2016-01-06|  3456|  20|
|    2|2016-01-20|  2345|  19|
|    1|2016-01-03|  4456|  22|
|    3|2016-02-02|  1345|  19|
|    2|2016-01-15|  1234|  45|
|    1|2016-01-01|  8650|  52|
|    2|2016-01-02|  7130|  65|
+-----+----------+------+----+

1
投票
如果两个 dfs 具有相同的结构,

union
应该这样做。

from pyspark.sql import functions as F
grp_by = {'id_no', 'start_date'}
df = df2.union(df1)
df = df.groupby(*grp_by).agg(*[F.first(c).alias(c) for c in set(df.columns)-grp_by])
df.show()
#     +-----+----------+----+------+
#     |id_no|start_date|days|amount|
#     +-----+----------+----+------+
#     |    1|2016-01-06|  20|  3456|
#     |    2|2016-01-20|  19|  2345|
#     |    1|2016-01-03|  22|  4456|
#     |    3|2016-02-02|  19|  1345|
#     |    2|2016-01-15|  45|  1234|
#     |    1|2016-01-01|  52|  8650|
#     |    2|2016-01-02|  65|  7130|
#     +-----+----------+----+------+

0
投票

如果您使用 Databricks Delta Lake 表,则可以使用 SQL 的

MERGE INTO
:

将基于源表的一组更新、插入和删除合并到目标 Delta 表中。
此语句仅支持 Delta Lake 表。

您只需要创建一个

new_id
,它是
id_no
start_date
的连接。

MERGE INTO df1
USING df2
ON df1.new_id = df2.new_id
WHEN MATCHED THEN
  UPDATE SET df1.amount = df2.amount, df1.days = df2.days
WHEN NOT MATCHED
  THEN INSERT *
© www.soinside.com 2019 - 2024. All rights reserved.