执行从 Databricks 到 Azure SQL 数据库的更新插入操作

问题描述 投票:0回答:1

我正在研究一种使用 PySpark 将数据更新插入我的 Azure SQL 数据库的方法。我想使用 Merge 语句来实现此目的,但我不确定如何使其与 Azure sql 数据库一起使用。我尝试过使用spark.write和spark.read方法,但它们没有按预期工作。现在,我正在使用 JDBC 连接到 Azure SQL 数据库。

from pyspark.sql import SparkSession
from pyspark.sql.types import *


# Connect to SQL Server using key vault credentials
scope_name = "TEST-TEST-scope"
jdbc_username = dbutils.secrets.get(scope_name, "key-username")
jdbc_password = dbutils.secrets.get(scope_name, "key-password")
jdbc_url = f"jdbc:sqlserver://server_name.database.windows.net:1433;database=test-database"
connection_properties = {
    "user": jdbc_username,
    "password": jdbc_password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

以下是我的程序定义。

def process_dataframe_upsert(url, dbtable, dataframe):

    dataframe.createOrReplaceTempView("v_new_entries")

    upsert_sql = """
        MERGE INTO {} AS target
    USING v_new_entries AS source
    ON target.Col1 = source.Col1 and target.Col2 = source.Col2 
    WHEN MATCHED THEN
        UPDATE SET target.Col3 = target.Col3 +1  
    WHEN NOT MATCHED THEN
        INSERT (Col1,Col2,Date,WorkShift,ExitStatus,Col3) VALUES (source.Col1,source.Col2,source.Col3)
    """.format(dbtable)

    #return_df = spark.read.jdbc(url=url, table="(" + upsert_sql + ") AS tmp", properties=connection_properties)
    #return return_df
    dataframe.write.jdbc(url=url, table="(" + upsert_sql + ") AS tmp", mode="append", properties=connection_properties)

程序执行示例如下

 process_dataframe_upsert(jdbc_url, table_name, test_df)

我得到的错误如下。

com.microsoft.sqlserver.jdbc.SQLServerException:“(”附近的语法不正确。

问题似乎出在表参数中调用upsert_sql。

table="(" + upsert_sql + ") AS tmp"

有没有其他方法可以更有效地完成这项任务?

apache-spark pyspark databricks azure-databricks databricks-sql
1个回答
0
投票

Spark 中在 Spark

table
参数中使用 SQL 查询存在限制。

因此,您可以做的是创建一个临时表并使用 JayDeBeApi 运行查询。使用以下命令安装此库。

pip install JayDeBeApi

像这样修改你的函数:

import jaydebeapi

def process_dataframe_upsert(url, dbtable, dataframe):

    dataframe.write.jdbc(url=url, table="stg", mode="overwrite")

    upsert_sql = """
    MERGE INTO {} AS target
    USING stg AS source
    ON target.id = source.id 
    WHEN MATCHED THEN
        UPDATE SET target.age =  source.age
    WHEN NOT MATCHED THEN
    INSERT (id, name_1, age) VALUES (source.id, source.name_1, source.age);
    """.format(dbtable)

    conn = jaydebeapi.connect("com.microsoft.sqlserver.jdbc.SQLServerDriver", url)
    curs = conn.cursor()
    curs.execute(upsert_sql)
    curs.close()
    conn.close()

数据框中的数据:

id 名称_1 年龄
3 约翰 25
4 爱丽丝 30
5 鲍勃 35

表中结果:

enter image description here

在这里,您可以看到临时表已创建,并使用该表执行查询。

您也可以使用

pyodbc
来实现此目的,但您需要安装 ODBC 驱动程序。

© www.soinside.com 2019 - 2024. All rights reserved.