我们正在将现有的 SQL 查询迁移到 pyspark 支持的 Spark SQL 查询中,然后最终的 Spark sql 生成的数据帧需要写入 ADLS gen2 中的增量格式文件中。我们期望当我们多次运行时,查询源表数据应该以增量方式写回到ADLS位置。请建议。
我们尝试使用下面的代码来编写截断并加载数据加载到ADLS的所有时间,
drop table IF EXISTS tableName;
test = spark.sql(""" select * from datavault.tablename """)
test..write.format("delta").mode("overwrite").option("overwriteschema","true").option("path",'/mnt/storagelocation/tablename').saveAsTable("tablename")
期望第一次加载完整数据,如果执行多次,数据应该增量加载到ADLS位置。请建议。
例如,我有这样的初始源数据:
emp_id name city country contact_info
1000 Mike NJ USA 1234560
接下来,我创建了
DELTA
表:
CREATE OR REPLACE TABLE emp_dim(
emp_id INT,
name STRING,
city STRING,
country STRING,
contact_info INT
)
USING DELTA
LOCATION "/FileStore/tables/emp_dim_delta"
为了对
MERGE
记录执行MATCHED
操作,我创建了TEMPVIEW
:
df.createOrReplaceTempView("source_view")
然后,我尝试了以下
MERGE
操作:
MERGE INTO emp_dim as target
USING source_view as source
ON target.emp_id = source.emp_id
WHEN MATCHED
THEN UPDATE SET
target.name= source.name,
target.city = source.city,
target.country = source.country,
target.contact_info = source.contact_info
WHEN NOT MATCHED THEN
INSERT(emp_id,name,city,country,contact_info)VALUES (emp_id,name,city,country,contact_info)
执行
MERGE
语句后,我们可以看到DELTA
表已初始加载。
SELECT * FROM emp_dim
emp_id name city country contact_info
1000 Mike NJ USA 1234560
接下来,我在源数据中有
UPDATED
一条记录和 INSERTED
一条记录,并执行了 MERGE
语句。
结果:
emp_id name city country contact_info
2000 corleone Nevada USA 78910111
1000 Mike NYC USA 1234560
然后,您可以写信给ADLS。 在我的示例中,我将 Delta 表写入 ADLS 路径,如下所示:
file_path ="abfss://[email protected]/ADB_Delta/"
test = spark.sql("""SELECT * FROM emp_dim02""")
test.write.format("delta").mode("overwrite").save(file_path)