使用 SQLAlchemy、pyodbc 和 MSSQL 将 csv 数据批量插入 SQL Server 时出现内存错误

问题描述 投票:0回答:1

如标题中所述,我在 python 脚本中使用 SQLAlchemy 和 PYODBC 将大型 csv 文件(最大 14GB)插入本地托管的 SQL Server 数据库中。

我知道我无法在内存中托管 14GB 数据帧,因此我使用 pandas 中的块功能来运行批量插入,并尝试了小至 100 行的批量大小,可以轻松装入内存。

我有一种感觉,内存错误与 SQL 方面有关。为了最大限度地减少负载处理,我在插入的表(哈希表)上没有任何索引。无论批量大小如何,我在加载过程的同一点都会耗尽内存。

我忽略了什么?我应该以某种方式刷新内存吗?或者 SQL Server 是否正在等待提交事务直到连接关闭?

这是我当前的代码:

`import os
import glob
import traceback

import pandas as pd
import pyodbc
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from tqdm import tqdm

# Replace the following variables with your database connection details
DB_USERNAME = '-----'
DB_PASSWORD = '-----'
DB_HOST = '------'
DB_NAME = '------'

DATA_DIRECTORY = "---------"
ERRORS_DIRECTORY = os.path.join(DATA_DIRECTORY, "errors")


def create_database_engine():
    engine = create_engine(f"mssql+pyodbc://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}?driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True )
    return engine

def batch_insert_to_database(engine, data, table, error_log_file):
    # Insert the data into the database in batches
    # The first batch will DROP ANY EXISTING TABLE IN THE DATABASE WITH THE SAME NAME
    try:
        data.to_sql(table, con=engine, index=False, if_exists='append')
    except SQLAlchemyError as e:
        error_message = f"Error during batch insertion: {e}"
        print("error encountered")
        
        with open(error_log_file, 'a') as error_log:
            error_log.write(error_message + '\n')
            traceback.print_exc(file=error_log)

        return False
    return True

def load_table_data(csv, table, errors_directory):
    
    print(f"Beginning load for {table}")
    
    # Create a database engine
    engine = create_database_engine()

    # Create an empty errors DataFrame to store failed batches
    errors_df = pd.DataFrame()

    # Batch size for insertion
    batch_size = 100

    # Initialize tqdm for progress tracking
    progress_bar = tqdm(total=0, desc="Processing")
    
    with engine.connect() as connection:
        truncate_command = text(f"TRUNCATE TABLE [dbo].[{table}]")
        connection.execute(truncate_command)

    error_report_file = os.path.join(errors_directory, f"errors_{table}.txt")
    # Read data from the CSV file in batches
    for batch_data in pd.read_csv(csv, chunksize=batch_size):
        # Try to insert the batch into the database
        success = batch_insert_to_database(engine, batch_data, table, error_report_file)

        # If the batch insertion fails, add the batch to the errors DataFrame
        if not success:
            errors_df = pd.concat([errors_df, batch_data])

        # Update the progress bar
        progress_bar.update(len(batch_data))

    # Close the progress bar
    progress_bar.close()


    error_data_file = os.path.join(errors_directory, f"errors_{table}.csv")
    # Save the errors DataFrame to a CSV file
    if not errors_df.empty:
        errors_df.to_csv(error_data_file, index=False)
        print(f"Errors saved to {error_data_file}")


def main():
    pattern = os.path.join(DATA_DIRECTORY, '**', '*.gz')
    gz_files = glob.glob(pattern, recursive=True)
    tables = [[file, file.split(os.path.sep)[-1].split(".")[0]] for file in gz_files]
    
    for table_data in tables:
        print(table_data)
        load_table_data(table_data[0], table_data[1], ERRORS_DIRECTORY)

if __name__ == "__main__":
    main()`

这是堆栈跟踪:

Traceback (most recent call last):
  File "C:\...\main.py", line 97, in <module>
    main()
  File "C:\...\main.py", line 94, in main
    load_table_data(table_data[0], table_data[1], ERRORS_DIRECTORY)
  File "C:\...\main.py", line 66, in load_table_data
    success = batch_insert_to_database(engine, batch_data, table, error_report_file)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\main.py", line 29, in batch_insert_to_database
    data.to_sql(table, con=engine, index=False, if_exists='append')
  File "C:\...\pandas\core\generic.py", line 3008, in to_sql
    return sql.to_sql(
           ^^^^^^^^^^^
  File "C:\...\pandas\io\sql.py", line 788, in to_sql
    return pandas_sql.to_sql(
           ^^^^^^^^^^^^^^^^^^
  File "C:\...\pandas\io\sql.py", line 1958, in to_sql
    total_inserted = sql_engine.insert_records(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\pandas\io\sql.py", line 1498, in insert_records
    return table.insert(chunksize=chunksize, method=method)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\pandas\io\sql.py", line 1059, in insert
    num_inserted = exec_insert(conn, keys, chunk_iter)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\pandas\io\sql.py", line 951, in _execute_insert
    result = conn.execute(self.table.insert(), data)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\sqlalchemy\engine\base.py", line 1412, in execute
    return meth(
           ^^^^^
  File "C:\...\sqlalchemy\sql\elements.py", line 516, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\sqlalchemy\engine\base.py", line 1635, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\sqlalchemy\engine\base.py", line 1844, in _execute_context
    return self._exec_single_context(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\sqlalchemy\engine\base.py", line 1984, in _exec_single_context
    self._handle_dbapi_exception(
  File "C:\...\sqlalchemy\engine\base.py", line 2342, in _handle_dbapi_exception
    raise exc_info[1].with_traceback(exc_info[2])
  File "C:\...\sqlalchemy\engine\base.py", line 1934, in _exec_single_context
    self.dialect.do_executemany(
  File "C:\...\sqlalchemy\dialects\mssql\pyodbc.py", line 716, in do_executemany
    super().do_executemany(cursor, statement, parameters, context=context)
  File "C:\...\sqlalchemy\engine\default.py", line 918, in do_executemany
    cursor.executemany(statement, parameters)

错误消息只是“MemoryError”。

python sql-server memory-management sqlalchemy pyodbc
1个回答
0
投票

我不太了解 sqlserver,但也许可以尝试这个来查看整个导入是否试图堆积在单个事务中,可能有更好的方法,但这会提供信息,(这将提交每批):

def batch_insert_to_database(engine, data, table, error_log_file):
    with engine.begin() as conn:
        data.to_sql(table, con=conn, index=False, if_exists='append')
    return True
© www.soinside.com 2019 - 2024. All rights reserved.