我想将一个大的pandas.DataFrame
发送到运行MS SQL的远程服务器。我现在这样做的方法是将data_frame
对象转换为元组列表,然后使用pyODBC的executemany()
函数将其发送出去。它是这样的:
import pyodbc as pdb
list_of_tuples = convert_df(data_frame)
connection = pdb.connect(cnxn_str)
cursor = connection.cursor()
cursor.fast_executemany = True
cursor.executemany(sql_statement, list_of_tuples)
connection.commit()
cursor.close()
connection.close()
然后我开始怀疑使用data_frame.to_sql()
方法是否可以加速(或至少更具可读性)。我想出了以下解决方案:
import sqlalchemy as sa
engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
data_frame.to_sql(table_name, engine, index=False)
现在代码更具可读性,但上传速度至少慢150倍......
有没有办法在使用SQLAlchemy时翻转fast_executemany
?
我正在使用pandas-0.20.3,pyODBC-4.0.21和sqlalchemy-1.1.13。
编辑(08/03/2019):Gord Thompson在下面评论了sqlalchemy的更新日志中的好消息:自从2019-03-04发布的SQLAlchemy 1.3.0以来,sqlalchemy现在支持engine = create_engine(sqlalchemy_url, fast_executemany=True)
用于mssql+pyodbc
方言。即,不再需要定义函数并使用@event.listens_for(engine, 'before_cursor_execute')
意味着可以删除下面的函数,只需要在create_engine语句中设置标志 - 并且仍然保持加速。
原帖:
刚刚发了一个帐号来发布这个。我想在上面的帖子下面发表评论,因为它是已经提供的答案的后续内容。上面的解决方案适用于使用基于Ubuntu的安装的Microsft SQL存储上的Version 17 SQL驱动程序。
我用来显着提高速度的完整代码(谈话> 100倍加速)低于。这是一个交钥匙片段,前提是您可以使用相关详细信息更改连接字符串。在上面的海报中,非常感谢你的解决方案,因为我已经看了很长时间了。
import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus
conn = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
engine = create_engine(new_con)
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
print("FUNC call")
if executemany:
cursor.fast_executemany = True
table_name = 'fast_executemany_test'
df = pd.DataFrame(np.random.random((10**4, 100)))
s = time.time()
df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
print(time.time() - s)
基于下面的评论,我想花一些时间来解释一些关于pandas to_sql
实现和查询处理方式的限制。有两件事可能会导致MemoryError
被提升为afaik:
1)假设您正在写入远程SQL存储。当您尝试使用to_sql
方法编写大型pandas DataFrame时,它会将整个数据帧转换为值列表。这种转换占用了比原始DataFrame更多的RAM(在它之上,因为旧的DataFrame仍然存在于RAM中)。此列表提供给ODBC连接器的最终executemany
调用。我认为ODBC连接器在处理如此大的查询时遇到了一些麻烦。解决这个问题的一种方法是为to_sql
方法提供一个chunksize参数(10 ** 5似乎是最佳的,在Azure的2 CPU 7GB ram MSSQL存储应用程序上提供大约600 mbit / s(!)的写入速度 - 不能推荐Azure btw)。因此,第一个限制是查询大小,可以通过提供chunksize
参数来规避。但是,这不会使您编写大小为10 ** 7或更大的数据帧(至少不在我正在使用的具有~55GB RAM的VM上),因为发布nr 2。
这可以通过使用np.split
(10 ** 6大小的DataFrame块)拆分DataFrame来规避。这些可以迭代地写出来。当我为pandas本身的核心中的to_sql
方法准备好解决方案时,我会尝试发出拉取请求,这样你就不必每次都预先分解。无论如何,我最终写了一个类似的功能(不是交钥匙)到下面:
import pandas as pd
import numpy as np
def write_df_to_sql(df, **kwargs):
chunks = np.split(df, df.shape()[0] / 10**6)
for chunk in chunks:
chunk.to_sql(**kwargs)
return True
可以在此处查看上述代码段的更完整示例:https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py
这是我编写的一个包含补丁的类,并简化了与SQL建立连接所带来的一些必要开销。还是要写一些文档。此外,我还计划为大熊猫本身提供补丁,但还没有找到一个很好的方法来解决这个问题。
我希望这有帮助。
在联系了SQLAlchemy的开发人员之后,出现了一种解决这个问题的方法。非常感谢他们的出色工作!
必须使用游标执行事件并检查是否已引发executemany
标志。如果确实如此,请打开fast_executemany
选项。例如:
from sqlalchemy import event
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True
有关执行事件的更多信息可以在here找到。
更新:在fast_executemany
中添加了对pyodbc
的SQLAlchemy 1.3.0的支持,因此不再需要这个hack。
我只想将这个完整的示例作为一个额外的高性能选项发布给那些可以使用新turbodbc库的人:http://turbodbc.readthedocs.io/en/latest/
在pandas .to_sql(),通过sqlalchemy触发fast_executemany,使用pyodbc直接使用tuples / lists /等,甚至尝试BULK UPLOAD与平面文件之间有很多选择。
希望以下可能会使当前的熊猫项目中的功能发展变得更加愉快,或者包括未来的turbodbc集成。
import pandas as pd
import numpy as np
from turbodbc import connect, make_options
from io import StringIO
test_data = '''id,transaction_dt,units,measures
1,2018-01-01,4,30.5
1,2018-01-03,4,26.3
2,2018-01-01,3,12.7
2,2018-01-03,3,8.8'''
df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])
options = make_options(parameter_sets_to_buffer=1000)
conn = connect(driver='{SQL Server}', server='server_nm', database='db_nm', turbodbc_options=options)
test_query = '''DROP TABLE IF EXISTS [db_name].[schema].[test]
CREATE TABLE [db_name].[schema].[test]
(
id int NULL,
transaction_dt datetime NULL,
units int NULL,
measures float NULL
)
INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
VALUES (?,?,?,?) '''
cursor.executemanycolumns(test_query, [df_test['id'].values, df_test['transaction_dt'].values, df_test['units'].values, df_test['measures'].values]
在很多用例中,turbodbc应该非常快(特别是对于numpy数组)。请注意直接将基础numpy数组从dataframe列作为参数传递给查询是多么简单。我也相信这有助于防止创建过度刺激内存消耗的中间对象。希望这有用!
我遇到了同样的问题,但使用PostgreSQL。他们现在只发布了pandas版本0.24.0,to_sql
函数中有一个名为method
的新参数解决了我的问题。
from sqlalchemy import create_engine
engine = create_engine(your_options)
data_frame.to_sql(table_name, engine, method="multi")
上传速度对我来说快了100倍。如果要发送大量数据,我还建议设置chunksize
参数。
似乎Pandas 0.23.0和0.24.0 use multi values inserts与PyODBC,这阻止快速执行帮助 - 每个块发出一个INSERT ... VALUES ...
语句。多值插入块是对旧的慢执行默认值的改进,但至少在简单测试中,快速执行方法仍然占优势,更不用说手动chunksize
计算,如多值插入所需。如果将来未提供配置选项,则可以通过monkeypatching强制执行旧行为:
import pandas.io.sql
def insert_statement(self, data, conn):
return self.table.insert(), data
pandas.io.sql.SQLTable.insert_statement = insert_statement
未来就在这里,至少在master
分支中,可以使用method=
的关键字参数to_sql()
来控制插入方法。它默认为None
,它强制执行executemany方法。传递method='multi'
会导致使用多值插入。它甚至可以用于实现DBMS特定的方法,例如Postgresql COPY
。
当使用to_sql
将pandas DataFrame上传到SQL Server时,如果没有fast_executemany
,turbodbc肯定会比pyodbc更快。但是,使用fast_executemany
启用pyodbc,两种方法都可以产生基本相同的性能。
测试环境:
[venv1_pyodbc] pyodbc 2.0.25
[venv2_turbodbc] turbodbc 3.0.0 sqlalchemy-turbodbc 0.1.0
[两者共通] Python 3.6.4在Windows上64位 SQLAlchemy 1.3.0b1 大熊猫0.23.4 numpy 1.15.4
测试代码:
# for pyodbc
engine = create_engine('mssql+pyodbc://sa:whatever@SQL_panorama', fast_executemany=True)
# for turbodbc
# engine = create_engine('mssql+turbodbc://sa:whatever@SQL_panorama')
# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
[[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
columns=[f'col{y:03}' for y in range(num_cols)]
)
t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")
对于每个环境,测试运行十二(12)次,丢弃每个环境的单个最佳和最差时间。结果(以秒为单位):
rank pyodbc turbodbc
---- ------ --------
1 22.8 27.5
2 23.4 28.1
3 24.6 28.2
4 25.2 28.5
5 25.7 29.3
6 26.9 29.9
7 27.0 31.4
8 30.1 32.1
9 33.6 32.5
10 39.8 32.9
---- ------ --------
average 27.9 30.0
正如@Pylander所指出的那样
到目前为止,Turbodbc是数据摄取的最佳选择!
我很兴奋,我在我的github和媒体上写了一个'博客':请检查https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e
一个工作示例并与pandas.to_sql进行比较
长话短说,
使用turbodbc我在3秒钟内获得了10000行(77列)
与pandas.to_sql我在198秒内得到相同的10000行(77列)...
这就是我正在做的全部细节
进口:
import sqlalchemy
import pandas as pd
import numpy as np
import turbodbc
import time
加载并处理一些数据 - 用我的sample.pkl替换你的:
df = pd.read_pickle('sample.pkl')
df.columns = df.columns.str.strip() # remove white spaces around column names
df = df.applymap(str.strip) # remove white spaces around values
df = df.replace('', np.nan) # map nans, to drop NAs rows and columns later
df = df.dropna(how='all', axis=0) # remove rows containing only NAs
df = df.dropna(how='all', axis=1) # remove columns containing only NAs
df = df.replace(np.nan, 'NA') # turbodbc hates null values...
使用sqlAlchemy创建表
不幸的是,turbodbc需要大量的开销和大量的sql手工劳动,用于创建表和在其上插入数据。
幸运的是,Python非常高兴,我们可以自动完成编写SQL代码的过程。
第一步是创建将接收我们数据的表。但是,如果您的表具有多个列,则手动创建表编写SQL代码可能会有问题。在我的情况下,表通常有240列!
这是sqlAlchemy和pandas仍然可以帮助我们的地方:pandas对于编写大量行(本例中为10000)不好,但是只有6行,表的头部呢?这样,我们就可以自动创建表格。
创建sqlAlchemy连接:
mydb = 'someDB'
def make_con(db):
"""Connect to a specified db."""
database_connection = sqlalchemy.create_engine(
'mssql+pymssql://{0}:{1}@{2}/{3}'.format(
myuser, mypassword,
myhost, db
)
)
return database_connection
pd_connection = make_con(mydb)
在SQL Server上创建表
使用pandas + sqlAlchemy,但只是为前面提到的turbodbc准备空间。请注意这里的df.head():我们使用pandas + sqlAlchemy只插入6行数据。这将运行得非常快,并且正在完成自动化表创建。
table = 'testing'
df.head().to_sql(table, con=pd_connection, index=False)
现在桌子已经到位,让我们在这里认真对待。
Turbodbc连接:
def turbo_conn(mydb):
"""Connect to a specified db - turbo."""
database_connection = turbodbc.connect(
driver='ODBC Driver 17 for SQL Server',
server=myhost,
database=mydb,
uid=myuser,
pwd=mypassword
)
return database_connection
为turbodbc准备sql命令和数据。让我们自动创建具有创意的代码:
def turbo_write(mydb, df, table):
"""Use turbodbc to insert data into sql."""
start = time.time()
# preparing columns
colunas = '('
colunas += ', '.join(df.columns)
colunas += ')'
# preparing value place holders
val_place_holder = ['?' for col in df.columns]
sql_val = '('
sql_val += ', '.join(val_place_holder)
sql_val += ')'
# writing sql query for turbodbc
sql = f"""
INSERT INTO {mydb}.dbo.{table} {colunas}
VALUES {sql_val}
"""
# writing array of values for turbodbc
valores_df = [df[col].values for col in df.columns]
# cleans the previous head insert
with connection.cursor() as cursor:
cursor.execute(f"delete from {mydb}.dbo.{table}")
connection.commit()
# inserts data, for real
with connection.cursor() as cursor:
try:
cursor.executemanycolumns(sql, valores_df)
connection.commit()
except Exception:
connection.rollback()
print('something went wrong')
stop = time.time() - start
return print(f'finished in {stop} seconds')
使用turbodbc写入数据 - 我在3秒钟内获得了10000行(77列):
turbo_write(mydb, df.sample(10000), table)
熊猫方法比较 - 我在198秒内得到了相同的10000行(77列)......
table = 'pd_testing'
def pandas_comparisson(df, table):
"""Load data using pandas."""
start = time.time()
df.to_sql(table, con=pd_connection, index=False)
stop = time.time() - start
return print(f'finished in {stop} seconds')
pandas_comparisson(df.sample(10000), table)
环境和条件
Python 3.6.7 :: Anaconda, Inc.
TURBODBC version ‘3.0.0’
sqlAlchemy version ‘1.2.12’
pandas version ‘0.23.4’
Microsoft SQL Server 2014
user with bulk operations privileges
请查看https://erickfis.github.io/loose-code/以获取此代码中的更新!
只是想加入@ J.K.的答案。
如果您使用此方法:
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True
你收到这个错误:
“sqlalchemy.exc.DBAPIError:(pyodbc.Error)('HY010','[HY010] [Microsoft] [SQL Server Native Client 11.0]函数序列错误(0)(SQLParamData)')[SQL:'INSERT INTO .. 。(...)VALUES(?,?)'] [参数:((...,...),(...,...)](关于此错误的背景:http://sqlalche.me/e/dbapi)“
像这样编码你的字符串值:'yourStringValue'.encode('ascii')
这将解决您的问题。