我们经常使用 pandas
to_sql
将 csv 文件加载到现有表中。我们通常没有问题,因为我们使用fast_executemany=True
。一个 40MB(350K 记录)的 csv 文件在 10 秒内加载。
但我们已经开始遇到一个问题,即即使使用
to_sql
,fast_executemany
也非常慢。它出现是由于 SQL Sentry SQL Monitor 在幕后运行 SQL 跟踪。
但我可以在没有 SQL Sentry 的情况下使用简单的跟踪进行重现。
当我在生产服务器(相同的文件和数据类型)上运行它时,它由 SQL Sentry 跟踪监视,它在我停止它之前运行了 10 多分钟(没有阻塞 - 我可以看到表计数缓慢增加).
当我在开发服务器(没有 SQL Sentry)上运行它时,它会在 10-15 秒内完成。
当我再次在开发服务器上运行它但运行扩展事件捕获(或分析器跟踪)时,它运行缓慢,就像产品一样。如果我暂停跟踪,它会立即再次变快。
为什么一条痕迹会产生如此巨大的影响?因为它必须生成大量
sp_execute
语句?有解决方法吗?
我将与 DBA 讨论他们在生产环境中捕获的事件以及他们是否可以减少开销。这是一个全天监控套件。
当我运行跟踪并在 to_sql 中使用 chunksize 参数时,我也看到了一些不同的结果。
import urllib
import sqlalchemy as sa
import pandas as pd
host = 'my_server'
schema = 'workdb'
params = urllib.parse.quote_plus("DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=" + host + ";"
"DATABASE=" + schema + ";"
"trusted_connection=yes;")
engine = sa.create_engine("mssql+pyodbc:///?odbc_connect={}".format(params), fast_executemany=True)
csv_path = r'C:\Users\me\Desktop\somefile.csv' # 40mb file
df = pd.read_csv(csv_path, dtype_backend='pyarrow') # pyarrow for pandas 2.0+.
df.to_sql(con=engine, name="target_table", schema="import", index=False, if_exists='append')
The CSV file is something like this:
day,ds,gender,age_group,country,device,dormancy_cohort,reg_id,uid
2023-04-17,20230417,1,0,GBR,Android,4,03f9dfza868sb58zza0s8cd0d6f4,b2406ea4da557s9a65926az804
痕迹确实有那么大的影响。我原以为 execute_many 的影响很小,类似于 SSIS 快速加载(又名批量插入),但事实并非如此。它仍然需要产生许多
sp_prepare
电话
只需关闭生产环境中的监控(SQL 跟踪)即可将 prod 脚本从 8 分钟减少到 40 秒,快 8 倍!
如果仍然需要监控(出于审计或其他原因),@gordthompson 提供了一个使用 OPENJSON 的解决方法。
# Alternative to_sql() *method* for mssql+pyodbc or mssql+pymssql
#
# adapted from https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method
#
# version 1.2 - 2023-05-08
from datetime import date
import json
import pandas as pd
import sqlalchemy as sa
def mssql_insert_json(table, conn, keys, data_iter):
"""
Execute SQL statement inserting data via OPENJSON
Parameters
----------
table : pandas.io.sql.SQLTable
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
keys : list of str
Column names
data_iter : Iterable that iterates the values to be inserted
"""
# build dict of {"column_name": "column_type"}
col_dict = {
str(col.name): "varchar(max)"
if str(col.type) == "TEXT"
else "nvarchar(max)"
if str(col.type) == "NTEXT"
else str(col.type)
for col in table.table.columns
}
columns = ", ".join([f"[{k}]" for k in keys])
if table.schema:
table_name = f"[{table.schema}].[{table.name}]"
else:
table_name = f"[{table.name}]"
json_data = [dict(zip(keys, row)) for row in data_iter]
with_clause = ",\n".join(
[
f"[{col_name}] {col_type} '$.\"{col_name}\"'"
for col_name, col_type in col_dict.items()
]
)
placeholder = "?" if conn.dialect.paramstyle == "qmark" else "%s"
sql = f"""\
INSERT INTO {table_name} ({columns})
SELECT {columns}
FROM OPENJSON({placeholder})
WITH
(
{with_clause}
);
"""
conn.exec_driver_sql(sql, (json.dumps(json_data, default=str),))
if __name__ == "__main__":
# =============
# USAGE EXAMPLE
# =============
# note: fast_executemany=True is not required
engine = sa.create_engine("mssql+pyodbc://scott:tiger^5HHH@mssql_199")
df = pd.DataFrame(
[(1, "Alfa", date(2001, 1, 1)), (2, "Bravo", date(2002, 2, 2))],
columns=["id", "my text", "date added"],
)
df.to_sql(
"##tmp",
engine,
index=False,
if_exists="append",
method=mssql_insert_json,
)
# check result
with engine.begin() as connection:
print(connection.exec_driver_sql("SELECT * FROM ##tmp").all())
# [(1, 'Alfa', datetime.date(2001, 1, 1)), (2, 'Bravo', datetime.date(2002, 2, 2))]
"""The INSERT statement generated for this example is:
INSERT INTO [##tmp] ([id], [my text], [date added])
SELECT [id], [my text], [date added]
FROM OPENJSON(?)
WITH
(
[id] BIGINT '$."id"',
[my text] varchar(max) '$."my text"',
[date added] DATE '$."date added"'
);
"""
更多讨论在这里:
最后,如果可以的话,考虑使用
bcp
。