如何使用sqlalchemy的on_conflict_do_update.returning返回更新后的值?

问题描述 投票:0回答:1

我正在尝试执行 upsert 语句并让查询返回更新后的值。在插入时,它工作正常,因为没有数据,但当有更新时,查询返回正在更新的旧数据。

这是我的 upsert 声明 -

def upsert(model, data, constraints):
    insert_stmt: Insert = insert(model).values(data)
    do_update_stmt = insert_stmt.on_conflict_do_update(
        index_elements=constraints,
        set_=data,
    )
    return do_update_stmt

我执行它并得到这样的值-

upsert_query = upsert(
            model,
            new_data,
            ["constraint"],
        )
        try:
            upsert_response = db.session.execute(
                upsert_query.returning(model)
            )
            updated_model = upsert_response.fetchone()[0]

问题是

updated_model
这里返回的是旧数据。我可以提交查询,它会在模型更改时为我提供更新的数据,但在我的特定用例中,我不想提交,直到运行上述代码后运行更多代码,并且如果以下代码无法执行我想要的回滚。不幸的是,我似乎无法在提交后进行回滚。

我的问题是;有没有办法从这里的响应中获取更新的数据而不是更新?如果没有,我怎样才能回滚这个提交?

db
是在应用程序中共享的
SqlAlchemy()
实例。

python postgresql sqlalchemy flask-sqlalchemy upsert
1个回答
0
投票

返回 ORM 对象时,您必须填充现有对象,否则它们将不会更新。

这里有一个例子:

使用带有upsert语句的返回

这是我制作的另一个例子

关键是

    res = session.execute(q, execution_options={"populate_existing": True}).fetchone()[0]
mport sys
from sqlalchemy import (
    create_engine,
    Integer,
    String,
)
from sqlalchemy.orm import Session, declarative_base, mapped_column
from sqlalchemy.sql import select
from sqlalchemy.dialects import postgresql

username, password, db = sys.argv[1:4]


engine = create_engine(f"postgresql+psycopg2://{username}:{password}@/{db}", echo=True)


Base = declarative_base()


Base.metadata.create_all(engine)


class User(Base):
    __tablename__ = "users"

    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String, unique=True)
    beverage = mapped_column(String, nullable=False)


Base.metadata.create_all(engine)


with Session(engine) as session:
    u1 = User(id=1, name="user1", beverage="coffee")
    u2 = User(id=2, name="user2", beverage="tea")
    session.add_all([u1, u2])
    session.commit()

def upsert(model, insert_data, update_data, index_elements):
    insert_stmt = postgresql.insert(model).values(insert_data)
    do_update_stmt = insert_stmt.on_conflict_do_update(
        index_elements=index_elements,
        set_=update_data,
    )
    return do_update_stmt


with Session(engine) as session:
    u1 = session.scalars(select(User).where(User.id == 1)).first()
    assert u1.beverage != 'water', "This should be the old value."
    data = dict(id=1, name="user1", beverage="water")
    q = upsert(User, data, dict((k, v) for (k, v) in data.items() if k != "id"), ["id"]).returning(User)

    res = session.execute(q, execution_options={"populate_existing": True}).fetchone()[0]
    assert res.beverage == 'water', "This should be the new value."
    assert u1.beverage == 'water', "Our pre-existing object should be the same object but check anyways."

    session.commit()

    # Now read it back and check again after commit.
    u1 = session.scalars(select(User).where(User.id == 1)).first()
    print (u1.beverage == 'water')
© www.soinside.com 2019 - 2024. All rights reserved.