如何从Spark SQL数据帧数据将增量数据写入ADLS存储

问题描述 投票:0回答:1

我们正在将现有的 SQL 查询迁移到 pyspark 支持的 Spark SQL 查询中,然后最终的 Spark sql 生成的数据帧需要写入 ADLS gen2 中的增量格式文件中。我们期望当我们多次运行时,查询源表数据应该以增量方式写回到ADLS位置。请建议。

我们尝试使用下面的代码来编写截断并加载数据加载到ADLS的所有时间,

drop table IF EXISTS tableName;
test = spark.sql(""" select * from datavault.tablename """) 
test..write.format("delta").mode("overwrite").option("overwriteschema","true").option("path",'/mnt/storagelocation/tablename').saveAsTable("tablename")

期望第一次加载完整数据,如果执行多次,数据应该增量加载到ADLS位置。请建议。

dataframe pyspark azure-blob-storage azure-databricks
1个回答
0
投票

例如,我有这样的初始源数据:

emp_id  name    city    country contact_info
1000    Mike    NJ      USA     1234560

接下来,我创建了

DELTA
表:

CREATE OR REPLACE TABLE emp_dim(
emp_id INT,
name STRING,
city STRING,
country STRING,
contact_info INT
)
USING DELTA
LOCATION "/FileStore/tables/emp_dim_delta"

为了对

MERGE
记录执行
MATCHED
操作,我创建了
TEMPVIEW
:

df.createOrReplaceTempView("source_view")

然后,我尝试了以下

MERGE
操作:

MERGE INTO emp_dim as target 
USING source_view as source 
 ON target.emp_id = source.emp_id
 WHEN MATCHED 
THEN UPDATE SET
  target.name= source.name,
  target.city = source.city,
  target.country = source.country,
  target.contact_info = source.contact_info
WHEN NOT MATCHED THEN
INSERT(emp_id,name,city,country,contact_info)VALUES (emp_id,name,city,country,contact_info)

执行

MERGE
语句后,我们可以看到
DELTA
表已初始加载。

SELECT * FROM emp_dim
emp_id  name    city    country contact_info
1000    Mike    NJ      USA     1234560

接下来,我在源数据中有

UPDATED
一条记录和
INSERTED
一条记录,并执行了
MERGE
语句。

结果:

emp_id  name        city    country contact_info
2000    corleone    Nevada  USA     78910111
1000    Mike        NYC     USA     1234560

然后,您可以写信给ADLS。 在我的示例中,我将 Delta 表写入 ADLS 路径,如下所示:

file_path ="abfss://[email protected]/ADB_Delta/"
test = spark.sql("""SELECT * FROM emp_dim02""")
test.write.format("delta").mode("overwrite").save(file_path)
© www.soinside.com 2019 - 2024. All rights reserved.