大熊猫数据帧的有效UPSERT到MS SQL Server中使用pyodbc

问题描述 投票:1回答:1

我试图使用的Upsert一个pyodbc大熊猫据帧到MS SQL服务器。我用类似的方法之前做直插,但我已经试过这一次的解决办法是慢得令人难以置信。有没有办法做到比我有一个更新插入一个更精简的方式?

sql_connect = pyodbc.connect('Driver={SQL Server Native Client 11.0}; Server=blank1; Database=blank2; UID=blank3; PWD=blank4')
cursor = sql_connect.cursor()

for index, row in bdf.iterrows():
    res = cursor.execute("UPDATE dbo.MPA_BOOK_RAW SET [SITE]=?, [SHIP_TO]=?, [PROD_LINE]=?, [GROUP_NUMBER]=?, [DESCRIPTION]=?, [ORDER_QTY]=?, [BPS_INCLUDE]=? WHERE [CUST]=? AND [ORDER_NUMBER]=? AND [ORDER_DATE]=? AND [PURCHASE_ORDER]=? AND [CHANNEL]=? AND [ITEM]=? AND [END_DT]=?", 
                    row['SITE'], 
                    row['SHIP_TO'],
                    row['PROD_LINE'],
                    row['GROUP_NUMBER'],
                    row['DESCRIPTION'],
                    row['ORDER_QTY'],
                    row['BPS_INCLUDE'],
                    row['CUST'],
                    row['ORDER_NUMBER'], 
                    row['ORDER_DATE'],
                    row['PURCHASE_ORDER'], 
                    row['CHANNEL'],
                    row['ITEM'],
                    row['END_DT'])

    if res.rowcount == 0:
            cursor.execute("INSERT INTO dbo.MPA_BOOK_RAW ([SITE], [CUST], [ORDER_NUMBER], [ORDER_DATE], [PURCHASE_ORDER], [CHANNEL], [SHIP_TO], [PROD_LINE], [GROUP_NUMBER], [DESCRIPTION], [ITEM], [ORDER_QTY], [END_DT], [BPS_INCLUDE]) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 
                    row['SITE'], 
                    row['CUST'],
                    row['ORDER_NUMBER'], 
                    row['ORDER_DATE'],
                    row['PURCHASE_ORDER'], 
                    row['CHANNEL'],
                    row['SHIP_TO'],
                    row['PROD_LINE'],
                    row['GROUP_NUMBER'],
                    row['DESCRIPTION'],
                    row['ITEM'],
                    row['ORDER_QTY'],
                    row['END_DT'],
                    row['BPS_INCLUDE'])

    sql_connect.commit()

cursor.close()
sql_connect.close()

我想上面我原来〜5万的行数据帧的五行样本,它工作得很好。因此,逻辑上似乎还好。这只是这是一个问题的速度。

python sql sql-server pandas pyodbc
1个回答
1
投票

评论这个问题建议上传数据帧到一个临时表中,然后合并的内容到主表。但是请注意,该documentation for the T-SQL MERGE statement说:

性能提示:对于MERGE语句描述的条件行为的效果最好,当两个表中存在的匹配特性的复杂混合物。例如,插入一行,如果它不存在,或者更新该行,如果它确实匹配。如果简单地基于另一个表的行更新一个表,改进的性能和可扩展性与基本的INSERT,UPDATE来实现,和DELETE语句。

你的情况的匹配标准相对简单 - 正是实际上是一个多列主键 - 所以你可以简单地使用匿名代码块用一个UPDATE语句和INSERT语句如下面的简化MCVE代码。

要求:

  • Python的3.6或更高版本的f'...'字符串格式化
  • SQLAlchemy的1.3的fast_executemany参数create_engine
  • DRIVER=ODBC Driver 17 for SQL Server;UseFMTONLY=Yes;可靠fast_executemany的INSERT到SQL Server #temporary表
import pandas as pd
import pyodbc
from sqlalchemy import __version__ as sa_version, create_engine, text
import sys
import urllib

print(sys.version)
# 3.7.2 (tags/v3.7.2:9a3ffc0492, Dec 23 2018, 23:09:28) [MSC v.1916 64 bit (AMD64)]
print(f'SQLAlchemy {sa_version}, pandas {pd.__version__}, pyodbc {pyodbc.version}')
# SQLAlchemy 1.3.0b2, pandas 0.24.1, pyodbc 4.0.25

connection_string = (
    r'DRIVER=ODBC Driver 17 for SQL Server;'
    r'SERVER=(local)\SQLEXPRESS;'
    r'DATABASE=myDb;'
    r'Trusted_Connection=Yes;'
    r'UseFMTONLY=Yes;'
)
sqlalchemy_url = (
    'mssql+pyodbc:///?odbc_connect=' \
    + urllib.parse.quote_plus(connection_string)
)
engine = create_engine(sqlalchemy_url, fast_executemany=True)

# set up test environment
if 0 == engine.execute("SELECT COUNT(*) FROM sys.tables WHERE name='actual_table';").fetchone()[0] :
    engine.execute("""\
    CREATE TABLE actual_table (
        institution_no VARCHAR(3), 
        transit_no VARCHAR(5), 
        branch_name VARCHAR(50),
        CONSTRAINT PK_actual_table PRIMARY KEY CLUSTERED 
            (institution_no, transit_no));
        """)
else:
    # clear out previous test data
    engine.execute(text("TRUNCATE TABLE actual_table;").execution_options(autocommit=True))
# actual_table initial state
engine.execute("""\
INSERT INTO actual_table (institution_no, transit_no, branch_name) VALUES 
    ('002', '45678', 'Scotiabank branch #45678 - *** UPDATE NEEDED ***'),
    ('003', '67890', 'RBC branch #67890 - Sudbury, ON');
""")

# test data to be updated or inserted
update_columns = ['institution_no', 'transit_no', 'branch_name']
update_data = [
    ['004', '12345', 'TD branch #12345 - London, ON'],
    ['002', '45678', 'Scotiabank branch #45678 - Timmins, ON'],
    ['004', '34567', 'TD branch #34567 - Toronto, ON'],
]
df_update = pd.DataFrame(update_data, columns=update_columns)

# Here's where the real work begins ...
#
# Step 1: upload update data
df_update.to_sql('#update_table', engine, index=None)
#
# Step 2: perform the "upsert"
sql = """\
SET NOCOUNT ON;
DECLARE @rows_updated INT = 0;
DECLARE @rows_inserted INT = 0;

UPDATE a SET a.branch_name = u.branch_name
    FROM actual_table a INNER JOIN #update_table u
        ON a.institution_no = u.institution_no 
            AND a.transit_no = u.transit_no;
SELECT @rows_updated = @@ROWCOUNT;

INSERT INTO actual_table (institution_no, transit_no, branch_name)
    SELECT institution_no, transit_no, branch_name
    FROM #update_table u
    WHERE NOT EXISTS (
        SELECT * FROM actual_table
        WHERE institution_no = u.institution_no
            AND transit_no = u.transit_no
    );
SELECT @rows_inserted = @@ROWCOUNT;

SELECT @rows_updated AS rows_updated, @rows_inserted AS rows_inserted;
"""
cnxn = engine.raw_connection()
result = cnxn.execute(sql).fetchone()
cnxn.commit()
print(f'{result[0]} row(s) updated, {result[1]} row(s) inserted')
# 1 row(s) updated, 2 row(s) inserted

# verify results
print(cnxn.execute("SELECT * FROM actual_table").fetchall())
# [('002', '45678', 'Scotiabank branch #45678 - Timmins, ON'),
#  ('003', '67890', 'RBC branch #67890 - Sudbury, ON'),
#  ('004', '12345', 'TD branch #12345 - London, ON'),
#  ('004', '34567', 'TD branch #34567 - Toronto, ON')]
© www.soinside.com 2019 - 2024. All rights reserved.