使用sqlalchemy orm,对于数据库中的不同表,我具有不同的类。例如,以下是其中两个:
class PortfolioDim(SqlAlchemyBase):
__tablename__ = 'portfoliodim'
Id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
PortfolioId = sa.Column(sa.String, index=True)
CompositeFrenchName = sa.Column(sa.String, index=True)
StartDate = sa.Column(sa.DateTime)
EndDate = sa.Column(sa.DateTime)
Benchmark = sa.Column(sa.String)
ClientOrFundName = sa.Column(sa.String)
LastUpdated = sa.Column(sa.DateTime, default=datetime.datetime.now)
class ValueFact(SqlAlchemyBase):
__tablename__ = 'valuefact'
Id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
PortfolioId = sa.Column(sa.String, index=True)
Date = sa.Column(sa.DateTime, index=True)
Currency = sa.Column(sa.String, index=True)
OpeningValue = sa.Column(sa.Float)
ClosingValue = sa.Column(sa.Float)
CashFlow = sa.Column(sa.Float)
LastUpdated = sa.Column(sa.DateTime, default=datetime.datetime.now, index=True)
到目前为止,我编写了2个不同的函数来更新每个函数。分别:
def update_valuefact(connecstr, dropindic, filedata):
''' Update value fact table, where filedata
is a list of rows, using sqlalchemy orm
'''
dbsession.init_db(connecstr, dropindic,
tableclassunderfocus=ValueFact)
session = dbsession.create_session()
for row in filedata:
valeur = ValueFact(CashFlow=row[0],
ClosingValue= row[1],
Currency= row[2],
Date = row[3],
OpeningValue= row[4],
PortfolioId = row[5]
)
session.add(valeur)
session.commit()
def update_portfoliodim(connecstr, dropindic, filedata):
''' Update portfolio dimension table,
where filedata is a list of rows, using sqlalchemy orm
'''
dbsession.init_db(connecstr, dropindic,
tableclassunderfocus=PortfolioDim)
session = dbsession.create_session()
for row in filedata:
portefeuille = PortfolioDim(Benchmark= row[0],
ClientOrFundName = row[1],
CompositeFrenchName=row[2],
EndDate = row[3],
PortfolioId= row[4],
StartDate= row[5]
)
session.add(portefeuille)
session.commit()
假设数据库正在扩展,我想编写一个update_table函数。我当时想我可以给它传递一个字典,该字典列出各个表中的各个字段,例如:
tablefieldsdicos = {'portfoliodim': {'PortfolioId': 'str', 'CompositeFrenchName': 'str',
'StartDate': 'datetime', 'EndDate': 'datetime', 'ClientOrFundName': 'str',
'Benchmark': 'str'},
'valuefact': {'CashFlow': 'float', 'Currency': 'str',
'ClosingValue': 'float',
'Date': 'datetime', 'OpeningValue': 'float', 'PortfolioId': 'str'}}
def update_table(connecstr, dropindic, filedata, tableclassunderfocus,
tablefieldsdico):
''' update a single table
filedata: list of rows (loaded from a csv file)
tableclassunderfocus: class matching table in sa orm
tablefieldsdico: dictionary where the keys are the name of the fields in alphabetical order
(order matching the order of the columns in source file "filedata")
'''
dbsession.init_db(connecstr, dropindic, tableclassunderfocus)
session = dbsession.create_session()
colindices = []
fields = []
for k, field in enumerate(tablefieldsdico.keys()):
colindices.append(k)
fields.append(field)
for row in filedata:
portefeuille = tableclassunderfocus(Benchmark= row[0],
ClientOrFundName = row[1],
CompositeFrenchName=row[2],
EndDate = row[3],
PortfolioId= row[4],
StartDate= row[5]
)
session.add(portefeuille)
session.commit()
我想知道如何将列表字段和colindices作为参数传递给tableunderfocus所指向的类。字段中的项目应在此处替换Benchmark,ClientOrFundName,...,StartDate,但是对于不同的类/表,字段将有所不同(字段列表的长度将有所不同)。 colindices中的项目应替换行[*]中的索引。
有解决此问题的建议吗?包括编写这种功能的完全不同的方法吗?
首先,创建一个字典列表,其中键是表字段,值是数据源文件中一行的元素。然后将字典列表传递给sqlalchemy bulk_insert_mappings函数,如Ilja的注释所指出的。
def update_table(connecstr, dropindic, filedata, tableclassunderfocus, tablename):
''' update a single table
filedata: list of dictionaries
(1 dictionary: keys are table fields; values are elements in source data rows (e.g. loaded from a csv file))
tableclassunderfocus: class matching table in sa orm
tablename: name of table in db
'''
dbsession.init_db(True, connecstr, dropindic, tableclassunderfocus)
session = dbsession.create_session()
session.bulk_insert_mappings(tableclassunderfocus, filedata)
session.commit()