SQLAlchemybulk_insert_mappings():无法获取表“test”的映射器

问题描述 投票:0回答:3

我在尝试使用 sqlalchemy 的bulk_insert_mappings 时遇到了困难。我知道我可以创建会话并连接到数据库。我已经初始化了我的引擎,但我似乎无法从表中获取我需要的映射器。

from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker,Session
from sqlalchemy_utils import get_mapper

engine = create_engine('mysql+pymysql://{}:{}@IP:PORT/'.format(USER,PW)) # removed my config here
connection = engine.connect()
m = MetaData(bind=engine,schema='test')
m.reflect()

Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()

我最近在这个问题上发现了一堆相关问题

SQLAlchemy 从表对象(从元数据或会话或其他方式)获取 Mapper 对象

但是 sqlalchemy_utils.get_mapper 引发:

“ValueError:无法获取表“test”的映射器。”

sqlalchemy.orm.mapperlib._mapper_registry
似乎是空的。也许是因为我没有将它绑定到我的引擎上。但不知道该怎么做。

PS:测试是一个非常简单的TEXT类型的单列表

这是 m.tables['test.test'] 的输出

Table('test', MetaData(bind=Engine(mysql+pymysql://USER:***@IP:PORT/)), Column('a', TEXT(), table=<test>), schema='test')
python mysql sqlalchemy mariadb sqlalchemy-utils
3个回答
6
投票

SQLAlchemy

Mapper
的工作是:

定义类属性与数据库表列的相关性。

...它是 SQLAlchemy ORM 的基础。使用 ORM,Python 类代表数据库中的表,并且需要某种机制将类上的属性与表中的列相关联。如果您不使用 ORM,您的表不会映射到 Python 类,因此不会使用映射器。这就是为什么您会收到

get_mapper()
错误。

在你的例子中:

m = MetaData(bind=engine,schema='test')
m.reflect()

MetaData
是:

Table
对象及其关联模式构造的集合。

MetaData.reflect

自动在此

Table
中为数据库中可用但尚未出现在
MetaData
中的任何表创建
MetaData
条目。

因此,此时,您有一组

Table
对象,并且您想要对其中一个对象执行批量插入。不要将
Table
对象与 ORM 映射类混淆,它们不是同一件事。

bulk_insert_mappings
上的文档指出:

执行给定映射字典列表的批量插入。

给定字典中的值通常不加修改地传递到 Core Insert() 构造中

您正在尝试实现数据的批量插入,我们可以跳过 ORM 方法(任何涉及

Session
的内容)并与 Core 显式交互。

表达式

pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records")
返回
dict
的列表,例如:
[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
,因此为了简单起见,我将使用此处的示例输出。

您的元数据对象中有已使用

m.tables['test.test']
检索的表,并且该
Table
对象可用于生成自己的插入语句:

print(m.tables['test.test'].insert())
# INSERT INTO test.test (a) VALUES (%(a)s)

要执行多个语句,我们可以将字典列表传递给

Connection.execute()
,如下所示。

ORM

Session
的好处之一是它允许显式事务管理,您可以在必要时调用
Session.rollback()
Session.commit()
。连接对象还可以使用
Session
 在类似于 
Engine.begin()
的事务中显式操作。

例如,使用上下文管理器:

with engine.begin() as conn:
    conn.execute(
        m.tables['test.test'].insert(),
        *[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
    )

如果上下文中没有错误,这将自动提交查询,如果有错误则回滚。

引擎日志显示此表达式发出以下查询:

INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})

以下人为示例显示了使用

Session.bulk_insert_mappings()
的原始查询。我必须创建一个 ORM 模型来表示表并向表中添加一个
id
字段,因为 ORM 不喜欢在没有主键的情况下工作。

m = MetaData(bind=engine,schema='test')
Base = declarative_base(metadata=m)

class Test(Base):
    __tablename__ = 'test'
    id = Column(Integer, primary_key=True)
    a = Column(Text)


Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()

这是从引擎日志中执行的查询:

INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})

您会注意到,这与我们直接使用 Core 能够实现的查询完全相同。


0
投票

我一直在谷歌上搜索完全相同的问题。不过,我找到了解决此问题的方法。

class Helper():
   pass
new_mapper = sqlalchemy.orm.mapper(Helper, local_table = m.tables['test.test'])
session.bulk_insert_mappings(new_mapper, 
df.to_dict(orient="records"), return_defaults = False)
session.commit()
session.close()

根据以下链接,我认为 df.to_sql 在将大量数据帧插入 sql 表中表现非常差。然而,事实证明,bulk_insert_mappings 要慢得多。 我希望它有帮助。


0
投票

对于 SQLAlchemy 2.0+,来自 https://docs.sqlalchemy.org/en/20/changelog/whatsnew_20.html 大约在页面中间,在您的 Base 中定义:

@classmethod
def insert_many_objs(cls, obj_list: list):
    """bulk insert many objects at once.
    param: obj_list is a list of the objects you want to insert"""
    cls.session.add_all(obj_list)
    cls.session.flush()
    cls.session.commit()
© www.soinside.com 2019 - 2024. All rights reserved.