如标题中所述,我在 python 脚本中使用 SQLAlchemy 和 PYODBC 将大型 csv 文件(最大 14GB)插入本地托管的 SQL Server 数据库中。
我知道我无法在内存中托管 14GB 数据帧,因此我使用 pandas 中的块功能来运行批量插入,并尝试了小至 100 行的批量大小,可以轻松装入内存。
我有一种感觉,内存错误与 SQL 方面有关。为了最大限度地减少负载处理,我在插入的表(哈希表)上没有任何索引。无论批量大小如何,我在加载过程的同一点都会耗尽内存。
我忽略了什么?我应该以某种方式刷新内存吗?或者 SQL Server 是否正在等待提交事务直到连接关闭?
这是我当前的代码:
`import os
import glob
import traceback
import pandas as pd
import pyodbc
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from tqdm import tqdm
# Replace the following variables with your database connection details
DB_USERNAME = '-----'
DB_PASSWORD = '-----'
DB_HOST = '------'
DB_NAME = '------'
DATA_DIRECTORY = "---------"
ERRORS_DIRECTORY = os.path.join(DATA_DIRECTORY, "errors")
def create_database_engine():
engine = create_engine(f"mssql+pyodbc://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}?driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True )
return engine
def batch_insert_to_database(engine, data, table, error_log_file):
# Insert the data into the database in batches
# The first batch will DROP ANY EXISTING TABLE IN THE DATABASE WITH THE SAME NAME
try:
data.to_sql(table, con=engine, index=False, if_exists='append')
except SQLAlchemyError as e:
error_message = f"Error during batch insertion: {e}"
print("error encountered")
with open(error_log_file, 'a') as error_log:
error_log.write(error_message + '\n')
traceback.print_exc(file=error_log)
return False
return True
def load_table_data(csv, table, errors_directory):
print(f"Beginning load for {table}")
# Create a database engine
engine = create_database_engine()
# Create an empty errors DataFrame to store failed batches
errors_df = pd.DataFrame()
# Batch size for insertion
batch_size = 100
# Initialize tqdm for progress tracking
progress_bar = tqdm(total=0, desc="Processing")
with engine.connect() as connection:
truncate_command = text(f"TRUNCATE TABLE [dbo].[{table}]")
connection.execute(truncate_command)
error_report_file = os.path.join(errors_directory, f"errors_{table}.txt")
# Read data from the CSV file in batches
for batch_data in pd.read_csv(csv, chunksize=batch_size):
# Try to insert the batch into the database
success = batch_insert_to_database(engine, batch_data, table, error_report_file)
# If the batch insertion fails, add the batch to the errors DataFrame
if not success:
errors_df = pd.concat([errors_df, batch_data])
# Update the progress bar
progress_bar.update(len(batch_data))
# Close the progress bar
progress_bar.close()
error_data_file = os.path.join(errors_directory, f"errors_{table}.csv")
# Save the errors DataFrame to a CSV file
if not errors_df.empty:
errors_df.to_csv(error_data_file, index=False)
print(f"Errors saved to {error_data_file}")
def main():
pattern = os.path.join(DATA_DIRECTORY, '**', '*.gz')
gz_files = glob.glob(pattern, recursive=True)
tables = [[file, file.split(os.path.sep)[-1].split(".")[0]] for file in gz_files]
for table_data in tables:
print(table_data)
load_table_data(table_data[0], table_data[1], ERRORS_DIRECTORY)
if __name__ == "__main__":
main()`
这是堆栈跟踪:
Traceback (most recent call last):
File "C:\...\main.py", line 97, in <module>
main()
File "C:\...\main.py", line 94, in main
load_table_data(table_data[0], table_data[1], ERRORS_DIRECTORY)
File "C:\...\main.py", line 66, in load_table_data
success = batch_insert_to_database(engine, batch_data, table, error_report_file)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\main.py", line 29, in batch_insert_to_database
data.to_sql(table, con=engine, index=False, if_exists='append')
File "C:\...\pandas\core\generic.py", line 3008, in to_sql
return sql.to_sql(
^^^^^^^^^^^
File "C:\...\pandas\io\sql.py", line 788, in to_sql
return pandas_sql.to_sql(
^^^^^^^^^^^^^^^^^^
File "C:\...\pandas\io\sql.py", line 1958, in to_sql
total_inserted = sql_engine.insert_records(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\pandas\io\sql.py", line 1498, in insert_records
return table.insert(chunksize=chunksize, method=method)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\pandas\io\sql.py", line 1059, in insert
num_inserted = exec_insert(conn, keys, chunk_iter)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\pandas\io\sql.py", line 951, in _execute_insert
result = conn.execute(self.table.insert(), data)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\sqlalchemy\engine\base.py", line 1412, in execute
return meth(
^^^^^
File "C:\...\sqlalchemy\sql\elements.py", line 516, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\sqlalchemy\engine\base.py", line 1635, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\sqlalchemy\engine\base.py", line 1844, in _execute_context
return self._exec_single_context(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\sqlalchemy\engine\base.py", line 1984, in _exec_single_context
self._handle_dbapi_exception(
File "C:\...\sqlalchemy\engine\base.py", line 2342, in _handle_dbapi_exception
raise exc_info[1].with_traceback(exc_info[2])
File "C:\...\sqlalchemy\engine\base.py", line 1934, in _exec_single_context
self.dialect.do_executemany(
File "C:\...\sqlalchemy\dialects\mssql\pyodbc.py", line 716, in do_executemany
super().do_executemany(cursor, statement, parameters, context=context)
File "C:\...\sqlalchemy\engine\default.py", line 918, in do_executemany
cursor.executemany(statement, parameters)
错误消息只是“MemoryError”。
我不太了解 sqlserver,但也许可以尝试这个来查看整个导入是否试图堆积在单个事务中,可能有更好的方法,但这会提供信息,(这将提交每批):
def batch_insert_to_database(engine, data, table, error_log_file):
with engine.begin() as conn:
data.to_sql(table, con=conn, index=False, if_exists='append')
return True