我使用官方饼干切割机创建了一个示例金字塔应用程序来描述我的问题。我使用 MySQL 作为数据库服务器。
我的数据库模型是千篇一律的原始代码:
class MyModel(Base):
__tablename__ = 'models'
id = Column(Integer, primary_key=True)
name = Column(Text)
value = Column(Integer)
Index('my_index', MyModel.name, unique=True, mysql_length=255)
我有一个简单的帖子视图:
data = {"error": False, "message": ""}
if request.method == "POST":
dct = variable_decode(request.POST)
added, message = add_record(request, dct["code"], dct["name"])
data["error"] = not added
data["message"] = message
return {"data": data}
这是函数 add_record 的第一个版本:
try:
new_record = MyModel(id=code, name=name, value=1)
request.dbsession.add(new_record)
return True, ""
except IntegrityError:
message = "Code {} already exist".format(code)
print(message)
return False, message
在第一个版本中,如果我复制主键,代码将无法捕获异常。
...
File "/home/cquiros/temp/pyramid/pyramid/lib/python3.10/site-packages/pyramid_tm/__init__.py", line 82, in _finish
finisher()
...
sqlalchemy.exc.IntegrityError: (mysql.connector.errors.IntegrityError) 1062 (23000): Duplicate entry '1' for key 'models.PRIMARY'
[SQL: INSERT INTO models (id, name, value) VALUES (%(id)s, %(name)s, %(value)s)]
[parameters: {'id': '1', 'name': 'd', 'value': 1}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)
在该函数的第二个版本中,我刷新会话:
def add_record(request, code, name):
try:
new_record = MyModel(id=code, name=name, value=1)
request.dbsession.add(new_record)
request.dbsession.flush()
return True, ""
except IntegrityError:
message = "Code {} already exist".format(code)
print(message)
return False, message
如果我刷新会话并复制主键,代码可以捕获异常。但是,如果我使用同一会话添加另一条记录,例如:
add_record(request, 100, "test")
我收到以下错误,显示之前捕获的主键异常:
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (mysql.connector.errors.IntegrityError) 1062 (23000): Duplicate entry '1' for key 'models.PRIMARY'
[SQL: INSERT INTO models (id, name, value) VALUES (%(id)s, %(name)s, %(value)s)]
[parameters: {'id': '1', 'name': 'd', 'value': 1}]
该错误指向回滚。所以这是我的 add_record() 函数的版本 3,我在其中回滚会话:
def add_record(request, code, name):
try:
new_record = MyModel(id=code, name=name, value=1)
request.dbsession.add(new_record)
request.dbsession.flush()
return True, ""
except IntegrityError:
request.dbsession.rollback()
message = "Code {} already exist".format(code)
print(message)
return False, message
但是,如果我使用同一会话添加另一条记录,例如:
add_record(request, 300, "test")
我收到错误:
File "/home/cquiros/temp/pyramid/pyramid/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 943, in _raise_for_prerequisite_state
raise sa_exc.ResourceClosedError("This transaction is closed")
sqlalchemy.exc.ResourceClosedError: This transaction is closed
此时,我想知道代码是否不是控制重复主键的正确方法。但是,我无法在金字塔文档或互联网上的其他任何地方找到解决该问题的不同方法。
任何想法都值得赞赏。
我很确定默认的千篇一律使用
pyramid_tm
来管理应用程序范围的事务。 sqlalchemy/db 事务由这个更大的事务机制管理。其背后的想法是,您可以将其他项目(例如发送电子邮件、排队作业等)全部放入与数据库事务相同的“全有或全无”事务中。缺点是在处理数据库事务时会失去一些粒度。对于像这样的小案例,我认为失败并重试整个请求是没有意义的。
解决此问题的另一种可以说是更好的解决方案是使用
"upsert"
。 IE。尝试插入,如果失败,因为它将创建重复项,然后根据需要更新预先存在的记录。如果您需要手动处理提交/回滚过程,您仍然应该能够使用保存点。我认为 mysql
对这两者都有一定的支持。
您可以在此处的
pyramid_tm
文档中阅读有关保存点的信息:
保存点
您可以在
"upserts"
文档中阅读有关 sqlalchemy
的信息:orm-queryguide-upsert