我正在研究一种使用 PySpark 将数据更新插入我的 Azure SQL 数据库的方法。我想使用 Merge 语句来实现此目的,但我不确定如何使其与 Azure sql 数据库一起使用。我尝试过使用spark.write和spark.read方法,但它们没有按预期工作。现在,我正在使用 JDBC 连接到 Azure SQL 数据库。
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Connect to SQL Server using key vault credentials
scope_name = "TEST-TEST-scope"
jdbc_username = dbutils.secrets.get(scope_name, "key-username")
jdbc_password = dbutils.secrets.get(scope_name, "key-password")
jdbc_url = f"jdbc:sqlserver://server_name.database.windows.net:1433;database=test-database"
connection_properties = {
"user": jdbc_username,
"password": jdbc_password,
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
以下是我的程序定义。
def process_dataframe_upsert(url, dbtable, dataframe):
dataframe.createOrReplaceTempView("v_new_entries")
upsert_sql = """
MERGE INTO {} AS target
USING v_new_entries AS source
ON target.Col1 = source.Col1 and target.Col2 = source.Col2
WHEN MATCHED THEN
UPDATE SET target.Col3 = target.Col3 +1
WHEN NOT MATCHED THEN
INSERT (Col1,Col2,Date,WorkShift,ExitStatus,Col3) VALUES (source.Col1,source.Col2,source.Col3)
""".format(dbtable)
#return_df = spark.read.jdbc(url=url, table="(" + upsert_sql + ") AS tmp", properties=connection_properties)
#return return_df
dataframe.write.jdbc(url=url, table="(" + upsert_sql + ") AS tmp", mode="append", properties=connection_properties)
程序执行示例如下
process_dataframe_upsert(jdbc_url, table_name, test_df)
我得到的错误如下。
com.microsoft.sqlserver.jdbc.SQLServerException:“(”附近的语法不正确。
问题似乎出在表参数中调用upsert_sql。
table="(" + upsert_sql + ") AS tmp"
有没有其他方法可以更有效地完成这项任务?
Spark 中在 Spark
table
参数中使用 SQL 查询存在限制。
因此,您可以做的是创建一个临时表并使用 JayDeBeApi 运行查询。使用以下命令安装此库。
pip install JayDeBeApi
像这样修改你的函数:
import jaydebeapi
def process_dataframe_upsert(url, dbtable, dataframe):
dataframe.write.jdbc(url=url, table="stg", mode="overwrite")
upsert_sql = """
MERGE INTO {} AS target
USING stg AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.age = source.age
WHEN NOT MATCHED THEN
INSERT (id, name_1, age) VALUES (source.id, source.name_1, source.age);
""".format(dbtable)
conn = jaydebeapi.connect("com.microsoft.sqlserver.jdbc.SQLServerDriver", url)
curs = conn.cursor()
curs.execute(upsert_sql)
curs.close()
conn.close()
数据框中的数据:
id | 名称_1 | 年龄 |
---|---|---|
3 | 约翰 | 25 |
4 | 爱丽丝 | 30 |
5 | 鲍勃 | 35 |
表中结果:
在这里,您可以看到临时表已创建,并使用该表执行查询。
您也可以使用
pyodbc
来实现此目的,但您需要安装 ODBC 驱动程序。