我正在尝试执行 upsert 语句并让查询返回更新后的值。在插入时,它工作正常,因为没有数据,但当有更新时,查询返回正在更新的旧数据。
这是我的 upsert 声明 -
def upsert(model, data, constraints):
insert_stmt: Insert = insert(model).values(data)
do_update_stmt = insert_stmt.on_conflict_do_update(
index_elements=constraints,
set_=data,
)
return do_update_stmt
我执行它并得到这样的值-
upsert_query = upsert(
model,
new_data,
["constraint"],
)
try:
upsert_response = db.session.execute(
upsert_query.returning(model)
)
updated_model = upsert_response.fetchone()[0]
问题是
updated_model
这里返回的是旧数据。我可以提交查询,它会在模型更改时为我提供更新的数据,但在我的特定用例中,我不想提交,直到运行上述代码后运行更多代码,并且如果以下代码无法执行我想要的回滚。不幸的是,我似乎无法在提交后进行回滚。
我的问题是;有没有办法从这里的响应中获取更新的数据而不是更新?如果没有,我怎样才能回滚这个提交?
db
是在应用程序中共享的 SqlAlchemy()
实例。
返回 ORM 对象时,您必须填充现有对象,否则它们将不会更新。
这里有一个例子:
关键是
res = session.execute(q, execution_options={"populate_existing": True}).fetchone()[0]
mport sys
from sqlalchemy import (
create_engine,
Integer,
String,
)
from sqlalchemy.orm import Session, declarative_base, mapped_column
from sqlalchemy.sql import select
from sqlalchemy.dialects import postgresql
username, password, db = sys.argv[1:4]
engine = create_engine(f"postgresql+psycopg2://{username}:{password}@/{db}", echo=True)
Base = declarative_base()
Base.metadata.create_all(engine)
class User(Base):
__tablename__ = "users"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String, unique=True)
beverage = mapped_column(String, nullable=False)
Base.metadata.create_all(engine)
with Session(engine) as session:
u1 = User(id=1, name="user1", beverage="coffee")
u2 = User(id=2, name="user2", beverage="tea")
session.add_all([u1, u2])
session.commit()
def upsert(model, insert_data, update_data, index_elements):
insert_stmt = postgresql.insert(model).values(insert_data)
do_update_stmt = insert_stmt.on_conflict_do_update(
index_elements=index_elements,
set_=update_data,
)
return do_update_stmt
with Session(engine) as session:
u1 = session.scalars(select(User).where(User.id == 1)).first()
assert u1.beverage != 'water', "This should be the old value."
data = dict(id=1, name="user1", beverage="water")
q = upsert(User, data, dict((k, v) for (k, v) in data.items() if k != "id"), ["id"]).returning(User)
res = session.execute(q, execution_options={"populate_existing": True}).fetchone()[0]
assert res.beverage == 'water', "This should be the new value."
assert u1.beverage == 'water', "Our pre-existing object should be the same object but check anyways."
session.commit()
# Now read it back and check again after commit.
u1 = session.scalars(select(User).where(User.id == 1)).first()
print (u1.beverage == 'water')