当 SQL 跟踪/扩展事件运行时,pandas to_sql 很慢

问题描述 投票:0回答:1

我们经常使用 pandas

to_sql
将 csv 文件加载到现有表中。我们通常没有问题,因为我们使用
fast_executemany=True
。一个 40MB(350K 记录)的 csv 文件在 10 秒内加载。

但我们已经开始遇到一个问题,即即使使用

to_sql
fast_executemany
也非常慢。它出现是由于 SQL Sentry SQL Monitor 在幕后运行 SQL 跟踪。

但我可以在没有 SQL Sentry 的情况下使用简单的跟踪进行重现。

  • 当我在生产服务器(相同的文件和数据类型)上运行它时,它由 SQL Sentry 跟踪监视,它在我停止它之前运行了 10 多分钟(没有阻塞 - 我可以看到表计数缓慢增加).

  • 当我在开发服务器(没有 SQL Sentry)上运行它时,它会在 10-15 秒内完成。

  • 当我再次在开发服务器上运行它但运行扩展事件捕获(或分析器跟踪)时,它运行缓慢,就像产品一样。如果我暂停跟踪,它会立即再次变快。

为什么一条痕迹会产生如此巨大的影响?因为它必须生成大量

sp_execute
语句?有解决方法吗?

我将与 DBA 讨论他们在生产环境中捕获的事件以及他们是否可以减少开销。这是一个全天监控套件。

当我运行跟踪并在 to_sql 中使用 chunksize 参数时,我也看到了一些不同的结果。

  • pandas 2.0(还有 1.4)
  • pyodbc 4.0.35
  • SQL 服务器 2017
import urllib
import sqlalchemy as sa
import pandas as pd
host = 'my_server'
schema = 'workdb'

params = urllib.parse.quote_plus("DRIVER={ODBC Driver 17 for SQL Server};"
                                 "SERVER=" + host + ";"
                                "DATABASE=" + schema + ";"
                               "trusted_connection=yes;")

engine = sa.create_engine("mssql+pyodbc:///?odbc_connect={}".format(params), fast_executemany=True)
csv_path = r'C:\Users\me\Desktop\somefile.csv' # 40mb file

df = pd.read_csv(csv_path, dtype_backend='pyarrow')  # pyarrow for pandas 2.0+.
df.to_sql(con=engine, name="target_table", schema="import", index=False, if_exists='append')
The CSV file is something like this:
day,ds,gender,age_group,country,device,dormancy_cohort,reg_id,uid
2023-04-17,20230417,1,0,GBR,Android,4,03f9dfza868sb58zza0s8cd0d6f4,b2406ea4da557s9a65926az804
sql-server pandas sqlalchemy sentry
1个回答
0
投票

痕迹确实有那么大的影响。我原以为 execute_many 的影响很小,类似于 SSIS 快速加载(又名批量插入),但事实并非如此。它仍然需要产生许多

sp_prepare
电话

只需关闭生产环境中的监控(SQL 跟踪)即可将 prod 脚本从 8 分钟减少到 40 秒,快 8 倍!

如果仍然需要监控(出于审计或其他原因),@gordthompson 提供了一个使用 OPENJSON 的解决方法。

# Alternative to_sql() *method* for mssql+pyodbc or mssql+pymssql
#
# adapted from https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method
#
# version 1.2 - 2023-05-08
from datetime import date
import json

import pandas as pd
import sqlalchemy as sa


def mssql_insert_json(table, conn, keys, data_iter):
    """
    Execute SQL statement inserting data via OPENJSON
    Parameters
    ----------
    table : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    """
    # build dict of {"column_name": "column_type"}
    col_dict = {
        str(col.name): "varchar(max)"
        if str(col.type) == "TEXT"
        else "nvarchar(max)"
        if str(col.type) == "NTEXT"
        else str(col.type)
        for col in table.table.columns
    }
    columns = ", ".join([f"[{k}]" for k in keys])
    if table.schema:
        table_name = f"[{table.schema}].[{table.name}]"
    else:
        table_name = f"[{table.name}]"

    json_data = [dict(zip(keys, row)) for row in data_iter]

    with_clause = ",\n".join(
        [
            f"[{col_name}] {col_type} '$.\"{col_name}\"'"
            for col_name, col_type in col_dict.items()
        ]
    )
    placeholder = "?" if conn.dialect.paramstyle == "qmark" else "%s"
    sql = f"""\
    INSERT INTO {table_name} ({columns})
    SELECT {columns}
    FROM OPENJSON({placeholder})
    WITH
    (
    {with_clause}
    );
    """

    conn.exec_driver_sql(sql, (json.dumps(json_data, default=str),))


if __name__ == "__main__":
    # =============
    # USAGE EXAMPLE
    # =============

    # note: fast_executemany=True is not required
    engine = sa.create_engine("mssql+pyodbc://scott:tiger^5HHH@mssql_199")

    df = pd.DataFrame(
        [(1, "Alfa", date(2001, 1, 1)), (2, "Bravo", date(2002, 2, 2))],
        columns=["id", "my text", "date added"],
    )

    df.to_sql(
        "##tmp",
        engine,
        index=False,
        if_exists="append",
        method=mssql_insert_json,
    )

    # check result
    with engine.begin() as connection:
        print(connection.exec_driver_sql("SELECT * FROM ##tmp").all())
        # [(1, 'Alfa', datetime.date(2001, 1, 1)), (2, 'Bravo', datetime.date(2002, 2, 2))]

"""The INSERT statement generated for this example is:
INSERT INTO [##tmp] ([id], [my text], [date added])
SELECT [id], [my text], [date added]
FROM OPENJSON(?)
WITH
(
[id] BIGINT '$."id"',
[my text] varchar(max) '$."my text"',
[date added] DATE '$."date added"'
);
"""

更多讨论在这里:

最后,如果可以的话,考虑使用

bcp

© www.soinside.com 2019 - 2024. All rights reserved.