[使用SQLAlchemy批量插入Pandas DataFrame

问题描述 投票:35回答:10

我有一些相当大的熊猫DataFrame,我想使用新的批量SQL映射通过SQL Alchemy将它们上传到Microsoft SQL Server。 pandas.to_sql方法虽然不错,但速度很慢。

我在编写代码时遇到问题...

我希望能够将此函数传递给我正在调用table的pandas DataFrame,我正在调用schema的架构名称以及我正在调用name的表名称。理想情况下,该函数将1.)删除表(如果已存在)。 2.)创建一个新表3.)创建一个映射器,并4.)使用映射器和熊猫数据批量插入。我被困在第3部分。

这是我的(公认的)粗体代码。我正在努力使映射器功能与主键一起使用。我并不是真的需要主键,但是mapper函数需要它。

感谢您的见解。

from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase

def bulk_upload(table, schema, name):
    e = create_engine('mssql+pyodbc://MYDB')
    s = create_session(bind=e)
    m = MetaData(bind=e,reflect=True,schema=schema)
    Base = declarative_base(bind=e,metadata=m)
    t = Table(name,m)
    m.remove(t)
    t.drop(checkfirst=True)
    sqld = SQLDatabase(e, schema=schema,meta=m)
    sqlt = SQLTable(name, sqld, table).table
    sqlt.metadata = m
    m.create_all(bind=e,tables=[sqlt])    
    class MyClass(Base):
        return
    mapper(MyClass, sqlt)    

    s.bulk_insert_mappings(MyClass, table.to_dict(orient='records'))
    return
python pandas sqlalchemy
10个回答
28
投票

我在pd.to_sql上花费了数小时才能上传数据时遇到了类似问题。下面的代码在几秒钟内批量插入了相同的数据。

from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO

address = 'postgresql://<username>:<pswd>@<host>:<port>/<database>'
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()

#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.reset_index(inplace=True)
df.rename(columns={'index':'Index'}, inplace =True)

#create the table but first drop if it already exists
command = '''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
(
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,
);'''
cursor.execute(command)
connection.commit()

#stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep='\t', header=False, index=False)
#jump to start of stream
output.seek(0)
contents = output.getvalue()
cur = connection.cursor()
#null values become ''
cur.copy_from(output, 'localytics_app2', null="")    
connection.commit()
cur.close()

-3
投票

对于遇到此问题并将目标数据库设为Redshift的任何人,请注意Redshift不会实现全套Postgres命令,因此使用Postgres的COPY FROMcopy_from()的某些答案将不起作用。psycopg2.ProgrammingError: syntax error at or near "stdin" error when trying to copy_from redshift


16
投票

那时可能已经回答了,但是我找到了解决方案,方法是在此站点上整理不同的答案并与SQLAlchemy的文档保持一致。

  1. 该表必须已经存在于db1中;并在启用了auto_increment的情况下设置了索引。
  2. Class Current必须与CSV中导入的数据框和db1中的表对齐。

希望这对来这里的人有帮助,并希望快速混合使用Panda和SQLAlchemy。

from urllib import quote_plus as urlquote
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Numeric
from sqlalchemy.orm import sessionmaker
import pandas as pd


# Set up of the engine to connect to the database
# the urlquote is used for passing the password which might contain special characters such as "/"
engine = create_engine('mysql://root:%s@localhost/db1' % urlquote('weirdPassword*withsp€cialcharacters'), echo=False)
conn = engine.connect()
Base = declarative_base()

#Declaration of the class in order to write into the database. This structure is standard and should align with SQLAlchemy's doc.
class Current(Base):
    __tablename__ = 'tableName'

    id = Column(Integer, primary_key=True)
    Date = Column(String(500))
    Type = Column(String(500))
    Value = Column(Numeric())

    def __repr__(self):
        return "(id='%s', Date='%s', Type='%s', Value='%s')" % (self.id, self.Date, self.Type, self.Value)

# Set up of the table in db and the file to import
fileToRead = 'file.csv'
tableToWriteTo = 'tableName'

# Panda to create a lovely dataframe
df_to_be_written = pd.read_csv(fileToRead)
# The orient='records' is the key of this, it allows to align with the format mentioned in the doc to insert in bulks.
listToWrite = df_to_be_written.to_dict(orient='records')

metadata = sqlalchemy.schema.MetaData(bind=engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)

# Open the session
Session = sessionmaker(bind=engine)
session = Session()

# Inser the dataframe into the database in one bulk
conn.execute(table.insert(), listToWrite)

# Commit the changes
session.commit()

# Close the session
session.close()

15
投票

基于@ansonw的答案:

def to_sql(engine, df, table, if_exists='fail', sep='\t', encoding='utf8'):
    # Create Table
    df[:0].to_sql(table, engine, if_exists=if_exists)

    # Prepare data
    output = cStringIO.StringIO()
    df.to_csv(output, sep=sep, header=False, encoding=encoding)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()
    cursor.copy_from(output, table, sep=sep, null='')
    connection.commit()
    cursor.close()

我在5秒而不是4分钟的时间内插入200000行


4
投票

Pandas 0.25.1具有执行多次插入的参数,因此不再需要使用SQLAlchemy解决此问题。

调用method='multi'时设置pandas.DataFrame.to_sql

在此示例中,df.to_sql(table, schema=schema, con=e, index=False, if_exists='replace', method='multi')

源自文档here的答案

值得注意的是,我仅使用Redshift进行了测试。请让我知道它如何在其他数据库上运行,以便我可以更新此答案。


3
投票

下面我的特定于postgres的解决方案使用您的pandas数据框自动创建数据库表,并使用postgres COPY my_table FROM ...执行快速的批量插入>

import io

import pandas as pd
from sqlalchemy import create_engine

def write_to_table(df, db_engine, schema, table_name, if_exists='fail'):
    string_data_io = io.StringIO()
    df.to_csv(string_data_io, sep='|', index=False)
    pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine, schema=schema)
    table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
                               index=False, if_exists=if_exists, schema=schema)
    table.create()
    string_data_io.seek(0)
    string_data_io.readline()  # remove header
    with db_engine.connect() as connection:
        with connection.connection.cursor() as cursor:
            copy_cmd = "COPY %s.%s FROM STDIN HEADER DELIMITER '|' CSV" % (schema, table_name)
            cursor.copy_expert(copy_cmd, string_data_io)
        connection.connection.commit()

2
投票

由于这是I / O繁重的工作,因此您还可以通过multiprocessing.dummy使用python线程模块。这为我加快了速度:


2
投票

对于像我这样试图实施上述解决方案的人:


2
投票

这里是简单方法


0
投票

这对我来说可以使用cx_Oracle和SQLALchemy连接到Oracle数据库

© www.soinside.com 2019 - 2024. All rights reserved.