如何使用 pyodbc 加速批量插入 MS SQL Server

问题描述 投票:0回答:6

下面是我的代码,我需要一些帮助。 我必须运行超过 1,300,000 行,这意味着需要 40 分钟 才能插入约 300,000 行。

我认为批量插入是加快速度的途径? 或者是因为我通过

for data in reader:
部分迭代行?

#Opens the prepped csv file
with open (os.path.join(newpath,outfile), 'r') as f:
    #hooks csv reader to file
    reader = csv.reader(f)
    #pulls out the columns (which match the SQL table)
    columns = next(reader)
    #trims any extra spaces
    columns = [x.strip(' ') for x in columns]
    #starts SQL statement
    query = 'bulk insert into SpikeData123({0}) values ({1})'
    #puts column names in SQL query 'query'
    query = query.format(','.join(columns), ','.join('?' * len(columns)))

    print 'Query is: %s' % query
    #starts curser from cnxn (which works)
    cursor = cnxn.cursor()
    #uploads everything by row
    for data in reader:
        cursor.execute(query, data)
        cursor.commit()

我有意动态地选择我的列标题(因为我想创建尽可能的Pythonic代码)。

SpikeData123 是表名称。

python sql-server csv pyodbc bulkinsert
6个回答
69
投票

如另一个答案的评论中所述,仅当要导入的文件与 SQL Server 实例位于同一台计算机上或位于 SQL Server 所在的 SMB/CIFS 网络位置时,T-SQL

BULK INSERT
命令才有效。实例可以读取。因此,它可能不适用于源文件位于远程客户端的情况。

pyodbc 4.0.19 添加了 Cursor#fast_executemany 功能,在这种情况下可能会有所帮助。

fast_executemany
默认为“关闭”,以下测试代码...

cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")

sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.perf_counter()
crsr.executemany(sql, params)
print(f'{time.perf_counter() - t0:.1f} seconds')

...在我的测试机器上执行大约需要 22 秒。只需添加

crsr.fast_executemany = True
...

cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")

crsr.fast_executemany = True  # new in pyodbc 4.0.19

sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.perf_counter()
crsr.executemany(sql, params)
print(f'{time.perf_counter() - t0:.1f} seconds')

...将执行时间缩短至 1 秒多一点。


52
投票

更新 - 2022 年 5 月:bcpandasbcpyaz 是 Microsoft

bcp
实用程序的包装器。


更新 - 2019 年 4 月:正如 @SimonLang 的评论中所述,SQL Server 2017 及更高版本下的

BULK INSERT
显然支持 CSV 文件中的文本限定符(参考:here)。


批量插入几乎肯定比逐行读取源文件并为每行执行常规插入要快得多。但是,BULK INSERT 和 BCP 对于 CSV 文件都有一个重大限制,因为它们无法处理文本限定符(参考:here)。也就是说,如果您的 CSV 文件中包含合格的文本字符串... 1,Gord Thompson,2015-04-15 2,Bob Loblaw,2015-04-07

...那么您可以批量插入它,但如果它包含文本限定符(因为某些文本值包含逗号)...

1,"Thompson, Gord",2015-04-15 2,"Loblaw, Bob",2015-04-07

...那么 BULK INSERT 无法处理它。尽管如此,将此类 CSV 文件预处理为管道分隔文件总体上可能会更快...

1|Thompson, Gord|2015-04-15 2|Loblaw, Bob|2015-04-07

...或制表符分隔文件(其中 

代表制表符)...

1→Thompson, Gord→2015-04-15
2→Loblaw, Bob→2015-04-07

...然后批量插入该文件。对于后一个(制表符分隔)文件,BULK INSERT 代码将如下所示:

import pypyodbc conn_str = "DSN=myDb_SQLEXPRESS;" cnxn = pypyodbc.connect(conn_str) crsr = cnxn.cursor() sql = """ BULK INSERT myDb.dbo.SpikeData123 FROM 'C:\\__tmp\\biTest.txt' WITH ( FIELDTERMINATOR='\\t', ROWTERMINATOR='\\n' ); """ crsr.execute(sql) cnxn.commit() crsr.close() cnxn.close()

注意:正如评论中提到的,执行 
BULK INSERT

语句仅适用于 SQL Server 实例可以直接读取源文件的情况。对于源文件位于远程客户端的情况,请参阅

此答案


1
投票

我还会仔细检查您的批量插入语法,因为它对我来说看起来不正确。检查 pyodbc 生成的 sql,因为我有一种感觉,它可能只执行普通插入

或者,如果它仍然很慢,我会尝试直接从 sql 使用批量插入,并使用批量插入将整个文件加载到临时表中,然后将相关列插入到正确的表中。或者混合使用批量插入和 bcp 来插入特定列或 OPENROWSET。


1
投票
fast_executemany

没有多大改进。具体来说,Bryan Bailliache 对 max varchar 的评论。我一直在使用 SQLAlchemy,甚至确保更好的数据类型参数也没有解决我的问题;然而,切换到 pyodbc 却做到了。我还采纳了 Michael Moura 使用临时表的建议,发现它节省了更多时间。我写了一个函数,以防有人发现它有用。我编写它是为了获取一个列表或列表列表来插入。我使用 SQLAlchemy 和 Pandas

to_sql
插入相同的数据,从有时需要 40 分钟缩短到不到 4 秒。不过,我可能误用了以前的方法。

连接

def mssql_conn(): conn = pyodbc.connect(driver='{ODBC Driver 17 for SQL Server}', server=os.environ.get('MS_SQL_SERVER'), database='EHT', uid=os.environ.get('MS_SQL_UN'), pwd=os.environ.get('MS_SQL_PW'), autocommit=True) return conn
插入功能

def mssql_insert(table,val_lst,truncate=False,temp_table=False): '''Use as direct connection to database to insert data, especially for large inserts. Takes either a single list (for one row), or list of list (for multiple rows). Can either append to table (default) or if truncate=True, replace existing.''' conn = mssql_conn() cursor = conn.cursor() cursor.fast_executemany = True tt = False qm = '?,' if isinstance(val_lst[0],list): rows = len(val_lst) params = qm * len(val_lst[0]) else: rows = 1 params = qm * len(val_lst) val_lst = [val_lst] params = params[:-1] if truncate: cursor.execute(f"TRUNCATE TABLE {table}") if temp_table: #create a temp table with same schema start_time = time.time() cursor.execute(f"SELECT * INTO ##{table} FROM {table} WHERE 1=0") table = f"##{table}" #set flag to indicate temp table was used tt = True else: start_time = time.time() #insert into either existing table or newly created temp table stmt = f"INSERT INTO {table} VALUES ({params})" cursor.executemany(stmt,val_lst) if tt: #remove temp moniker and insert from temp table dest_table = table[2:] cursor.execute(f"INSERT INTO {dest_table} SELECT * FROM {table}") print('Temp table used!') print(f'{rows} rows inserted into the {dest_table} table in {time.time() - start_time} seconds') else: print('No temp table used!') print(f'{rows} rows inserted into the {table} table in {time.time() - start_time} seconds') cursor.close() conn.close()
我的控制台结果首先使用临时表,然后不使用临时表(在这两种情况下,该表都包含执行时的数据并且 Truncate=True):

No temp table used! 18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 10.595500707626343 seconds Temp table used! 18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 3.810380458831787 seconds



1
投票

这是我关于我所做的测试的博客:

http://jonmorisissqlblog.blogspot.com/2021/05/python-pyodbc-and-batch-inserts-to-sql.html


0
投票

# add the below line for controlling batch size of insert cursor.fast_executemany_rows = batch_size # by default it is 1000

© www.soinside.com 2019 - 2024. All rights reserved.