使用 SQLAlchemy 将 CSV 文件加载到数据库中

问题描述 投票:0回答:6

如何将 CSV 文件加载到数据库中?

python database sqlalchemy
6个回答
61
投票

如果您的 CSV 非常大,使用 INSERTS 的效率非常低。您应该使用批量加载机制,该机制因基础而异。例如。在 PostgreSQL 中你应该使用“COPY FROM”方法:

with open(csv_file_path, 'r') as f:    
    conn = create_engine('postgresql+psycopg2://...').raw_connection()
    cursor = conn.cursor()
    cmd = 'COPY tbl_name(col1, col2, col3) FROM STDIN WITH (FORMAT CSV, HEADER FALSE)'
    cursor.copy_expert(cmd, f)
    conn.commit()

59
投票

由于 SQLAlchemy 的强大功能,我也在一个项目中使用它。它的强大之处在于以面向对象的方式与数据库“对话”,而不是硬编码难以管理的 SQL 语句。更不用说,它也快了很多。

坦白地回答你的问题,是的!使用 SQLAlchemy 将 CSV 中的数据存储到数据库中是小菜一碟。这是一个完整的工作示例(我使用 SQLAlchemy 1.0.6 和 Python 2.7.6):

from numpy import genfromtxt
from time import time
from datetime import datetime
from sqlalchemy import Column, Integer, Float, Date
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

def Load_Data(file_name):
    data = genfromtxt(file_name, delimiter=',', skip_header=1, converters={0: lambda s: str(s)})
    return data.tolist()

Base = declarative_base()

class Price_History(Base):
    #Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
    __tablename__ = 'Price_History'
    __table_args__ = {'sqlite_autoincrement': True}
    #tell SQLAlchemy the name of column and its attributes:
    id = Column(Integer, primary_key=True, nullable=False) 
    date = Column(Date)
    opn = Column(Float)
    hi = Column(Float)
    lo = Column(Float)
    close = Column(Float)
    vol = Column(Float)

if __name__ == "__main__":
    t = time()

    #Create the database
    engine = create_engine('sqlite:///csv_test.db')
    Base.metadata.create_all(engine)

    #Create the session
    session = sessionmaker()
    session.configure(bind=engine)
    s = session()

    try:
        file_name = "t.csv" #sample CSV file used:  http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv
        data = Load_Data(file_name) 

        for i in data:
            record = Price_History(**{
                'date' : datetime.strptime(i[0], '%d-%b-%y').date(),
                'opn' : i[1],
                'hi' : i[2],
                'lo' : i[3],
                'close' : i[4],
                'vol' : i[5]
            })
            s.add(record) #Add all the records

        s.commit() #Attempt to commit all the records
    except:
        s.rollback() #Rollback the changes on error
    finally:
        s.close() #Close the connection
    print "Time elapsed: " + str(time() - t) + " s." #0.091s

(注意:这不一定是“最佳”方法,但我认为这种格式对于初学者来说非常易读;它也非常快:插入 251 条记录只需 0.091 秒!)

我想如果你逐行浏览它,你就会发现它使用起来是多么轻松。请注意缺少 SQL 语句——万岁!我还冒昧地使用 numpy 在两行中加载 CSV 内容,但如果您愿意,也可以不使用它。

如果您想与传统的做法进行比较,这里有一个完整的工作示例供参考:

import sqlite3
import time
from numpy import genfromtxt

def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d


def Create_DB(db):      
    #Create DB and format it as needed
    with sqlite3.connect(db) as conn:
        conn.row_factory = dict_factory
        conn.text_factory = str

        cursor = conn.cursor()

        cursor.execute("CREATE TABLE [Price_History] ([id] INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL UNIQUE, [date] DATE, [opn] FLOAT, [hi] FLOAT, [lo] FLOAT, [close] FLOAT, [vol] INTEGER);")


def Add_Record(db, data):
    #Insert record into table
    with sqlite3.connect(db) as conn:
        conn.row_factory = dict_factory
        conn.text_factory = str

        cursor = conn.cursor()

        cursor.execute("INSERT INTO Price_History({cols}) VALUES({vals});".format(cols = str(data.keys()).strip('[]'), 
                    vals=str([data[i] for i in data]).strip('[]')
                    ))


def Load_Data(file_name):
    data = genfromtxt(file_name, delimiter=',', skiprows=1, converters={0: lambda s: str(s)})
    return data.tolist()


if __name__ == "__main__":
    t = time.time() 

    db = 'csv_test_sql.db' #Database filename 
    file_name = "t.csv" #sample CSV file used:  http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv

    data = Load_Data(file_name) #Get data from CSV

    Create_DB(db) #Create DB

    #For every record, format and insert to table
    for i in data:
        record = {
                'date' : i[0],
                'opn' : i[1],
                'hi' : i[2],
                'lo' : i[3],
                'close' : i[4],
                'vol' : i[5]
            }
        Add_Record(db, record)

    print "Time elapsed: " + str(time.time() - t) + " s." #3.604s

(注意:即使采用“旧”方式,这也绝不是最好的方式,但它非常可读,并且是 SQLAlchemy 方式与“旧”方式的“一对一”翻译。 )

注意SQL语句:一个创建表,另一个插入记录。另外,请注意,与简单的类属性添加相比,维护长 SQL 字符串要麻烦一些。到目前为止喜欢 SQLAlchemy 吗?

当然,至于您的外键查询。 SQLAlchemy 也有能力做到这一点。下面是一个带有外键赋值的类属性的示例(假设

ForeignKey
类也已从
sqlalchemy
模块导入):

class Asset_Analysis(Base):
    #Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
    __tablename__ = 'Asset_Analysis'
    __table_args__ = {'sqlite_autoincrement': True}
    #tell SQLAlchemy the name of column and its attributes:
    id = Column(Integer, primary_key=True, nullable=False) 
    fid = Column(Integer, ForeignKey('Price_History.id'))

它将“fid”列作为外键指向 Price_History 的 id 列。

希望有帮助!


22
投票

我遇到了完全相同的问题,而且我发现使用 pandas 的两步过程更容易:

import pandas as pd
with open(csv_file_path, 'r') as file:
    data_df = pd.read_csv(file)
data_df.to_sql('tbl_name', con=engine, index=True, index_label='id', if_exists='replace')

请注意,我的方法类似于这个,但不知何故谷歌将我发送到这个线程,所以我想我会分享。


8
投票

要使用 sqlalchemy 将相对较小的 CSV 文件导入数据库,您可以使用

engine.execute(my_table.insert(), list_of_row_dicts)
,如 sqlalchemy 教程“执行多个语句”部分中详细描述。

这有时被称为

“executemany”调用风格,因为它会导致 executemany

 DBAPI 调用
。数据库驱动程序可能会执行单个多值INSERT .. VALUES (..), (..), (..)
语句,这会减少与数据库的往返次数并加快执行速度:

  • MySQL 连接器默认执行此操作
  • Postgres 的 psycopg2
  • 不会,除非你用 create_engine(...,executemany_mode='values')) 初始化它 与 MS SQL Server 的 ODBC 驱动程序一起使用时,
  • pyodbc 的
  • fast_executemany 标志。 (但是不是 pymssql!)
根据

sqlalchemy的FAQ,这是在不使用特定于数据库的批量加载方法的情况下可以获得的最快速度,例如Postgres中的COPY FROM,MySQL中的LOAD DATA LOCAL INFILE等。特别是它比使用普通的 ORM(如@Manuel J. Diaz 的回答)、bulk_save_objects

bulk_insert_mappings

import csv from sqlalchemy import create_engine, Table, Column, Integer, MetaData engine = create_engine('sqlite:///sqlalchemy.db', echo=True) metadata = MetaData() # Define the table with sqlalchemy: my_table = Table('MyTable', metadata, Column('foo', Integer), Column('bar', Integer), ) metadata.create_all(engine) insert_query = my_table.insert() # Or read the definition from the DB: # metadata.reflect(engine, only=['MyTable']) # my_table = Table('MyTable', metadata, autoload=True, autoload_with=engine) # insert_query = my_table.insert() # Or hardcode the SQL query: # insert_query = "INSERT INTO MyTable (foo, bar) VALUES (:foo, :bar)" with open('test.csv', 'r', encoding="utf-8") as csvfile: csv_reader = csv.reader(csvfile, delimiter=',') engine.execute( insert_query, [{"foo": row[0], "bar": row[1]} for row in csv_reader] )
    

3
投票
带有逗号和标题名称的 CSV 文件到 PostrgeSQL

    我正在使用 csv Python 阅读器。 CSV 数据除以逗号 (,)
  1. 然后将其转换为 Pandas DataFrame。列的名称与 csv 文件中的名称相同。
  2. 最后,DataFrame转sql,引擎作为DB的连接。 if_exists='替换/追加'
import csv import pandas as pd from sqlalchemy import create_engine # Create engine to connect with DB try: engine = create_engine( 'postgresql://username:password@localhost:5432/name_of_base') except: print("Can't create 'engine") # Get data from CSV file to DataFrame(Pandas) with open('test.csv', newline='') as csvfile: reader = csv.DictReader(csvfile) columns = ['one', 'two', 'three'] df = pd.DataFrame(data=reader, columns=columns) # Standart method of Pandas to deliver data from DataFrame to PastgresQL try: with engine.begin() as connection: df.to_sql('name_of_table', con=connection, index_label='id', if_exists='replace') print('Done, ok!') except Exception as e: print(e)
    

1
投票
这是我让它发挥作用的唯一方法。其他答案没有明确提交光标的连接。这也意味着您正在使用现代 python、sqlalchemy 和显然 postgres,因为语法使用

COPY ... FROM

没有错误处理,它可能不安全,并且它使用 ORM 映射器定义中的所有非主键列,但对于简单的任务,它可能会做得很好。

import io import sqlalchemy Base: sqlalchemy.orm.DeclarativeMeta = db.orm.declarative_base() def upload_to_model_table( Model: Base, csv_stream: io.IOBase, engine: sqlalchemy.engine, header=True, delimiter=';' ): """ It's assumed you're using postgres, otherwise this won't work. """ fieldnames = ', '.join([ f'"{col.name}"' for col in Model.__mapper__.columns if not col.primary_key ]) sql = """ COPY {0} ({1}) FROM stdin WITH (format CSV, header {2}, delimiter '{3}') """.format(Model.__tablename__, fieldnames, header, delimiter) chunk_size = getattr(csv_stream, "_DEFAULT_CHUNK_SIZE", 1024) with engine.connect() as connection: cursor = connection.connection.cursor() cursor.copy_expert(sql, csv_stream, chunk_size) cursor.connection.commit() cursor.close()
    
© www.soinside.com 2019 - 2024. All rights reserved.