我在尝试使用 sqlalchemy 的bulk_insert_mappings 时遇到了困难。我知道我可以创建会话并连接到数据库。我已经初始化了我的引擎,但我似乎无法从表中获取我需要的映射器。
from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker,Session
from sqlalchemy_utils import get_mapper
engine = create_engine('mysql+pymysql://{}:{}@IP:PORT/'.format(USER,PW)) # removed my config here
connection = engine.connect()
m = MetaData(bind=engine,schema='test')
m.reflect()
Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()
我最近在这个问题上发现了一堆相关问题
SQLAlchemy 从表对象(从元数据或会话或其他方式)获取 Mapper 对象
但是 sqlalchemy_utils.get_mapper 引发:
“ValueError:无法获取表“test”的映射器。”
sqlalchemy.orm.mapperlib._mapper_registry
似乎是空的。也许是因为我没有将它绑定到我的引擎上。但不知道该怎么做。
PS:测试是一个非常简单的TEXT类型的单列表
这是 m.tables['test.test'] 的输出
Table('test', MetaData(bind=Engine(mysql+pymysql://USER:***@IP:PORT/)), Column('a', TEXT(), table=<test>), schema='test')
Mapper
的工作是:
定义类属性与数据库表列的相关性。
...它是 SQLAlchemy ORM 的基础。使用 ORM,Python 类代表数据库中的表,并且需要某种机制将类上的属性与表中的列相关联。如果您不使用 ORM,您的表不会映射到 Python 类,因此不会使用映射器。这就是为什么您会收到
get_mapper()
错误。
在你的例子中:
m = MetaData(bind=engine,schema='test')
m.reflect()
MetaData
是:
对象及其关联模式构造的集合。Table
MetaData.reflect
:
自动在此
中为数据库中可用但尚未出现在Table
中的任何表创建MetaData
条目。MetaData
因此,此时,您有一组
Table
对象,并且您想要对其中一个对象执行批量插入。不要将 Table
对象与 ORM 映射类混淆,它们不是同一件事。
bulk_insert_mappings
上的文档指出:
执行给定映射字典列表的批量插入。
和
给定字典中的值通常不加修改地传递到 Core Insert() 构造中
您正在尝试实现数据的批量插入,我们可以跳过 ORM 方法(任何涉及
Session
的内容)并与 Core 显式交互。
表达式
pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records")
返回 dict
的列表,例如:[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
,因此为了简单起见,我将使用此处的示例输出。
您的元数据对象中有已使用
m.tables['test.test']
检索的表,并且该 Table
对象可用于生成自己的插入语句:
print(m.tables['test.test'].insert())
# INSERT INTO test.test (a) VALUES (%(a)s)
要执行多个语句,我们可以将字典列表传递给
Connection.execute()
,如下所示。
ORM
Session
的好处之一是它允许显式事务管理,您可以在必要时调用 Session.rollback()
或 Session.commit()
。连接对象还可以使用 Session
在类似于
Engine.begin()
的事务中显式操作。
例如,使用上下文管理器:
with engine.begin() as conn:
conn.execute(
m.tables['test.test'].insert(),
*[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
)
如果上下文中没有错误,这将自动提交查询,如果有错误则回滚。
引擎日志显示此表达式发出以下查询:
INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})
以下人为示例显示了使用
Session.bulk_insert_mappings()
的原始查询。我必须创建一个 ORM 模型来表示表并向表中添加一个 id
字段,因为 ORM 不喜欢在没有主键的情况下工作。
m = MetaData(bind=engine,schema='test')
Base = declarative_base(metadata=m)
class Test(Base):
__tablename__ = 'test'
id = Column(Integer, primary_key=True)
a = Column(Text)
Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()
这是从引擎日志中执行的查询:
INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})
您会注意到,这与我们直接使用 Core 能够实现的查询完全相同。
我一直在谷歌上搜索完全相同的问题。不过,我找到了解决此问题的方法。
class Helper():
pass
new_mapper = sqlalchemy.orm.mapper(Helper, local_table = m.tables['test.test'])
session.bulk_insert_mappings(new_mapper,
df.to_dict(orient="records"), return_defaults = False)
session.commit()
session.close()
根据以下链接,我认为 df.to_sql 在将大量数据帧插入 sql 表中表现非常差。然而,事实证明,bulk_insert_mappings 要慢得多。 我希望它有帮助。
对于 SQLAlchemy 2.0+,来自 https://docs.sqlalchemy.org/en/20/changelog/whatsnew_20.html 大约在页面中间,在您的 Base 中定义:
@classmethod
def insert_many_objs(cls, obj_list: list):
"""bulk insert many objects at once.
param: obj_list is a list of the objects you want to insert"""
cls.session.add_all(obj_list)
cls.session.flush()
cls.session.commit()