我在 python 中使用 sqlite3 的行为非常奇怪。
我有一个小型烧瓶服务,有 2 个端点
addTask
和 deleteTask
。addTask
- 将新的随机 id(任务 id)推送到数组和数据库中,这是它的代码:端点:
@tasks_bp.route("/addTask", methods=["POST"])
def addTask():
hash = ''.join(secrets.choice(string.ascii_letters + string.digits + string.punctuation) for _ in range(10))
data = request.get_json()
return tasksService.addTask(hash)
服务:
def addTask(hash):
try:
conn = db.getConn()
new_task_id = random.randint(1, 999)
tasks_order = dbm.getTasksOrderList()
dbm.updateTasksOrderList(tasks_order + [new_task_id])
conn.commit()
tasks_order = dbm.getTasksOrderList()
print(hash, "added", new_task_id, datetime.now().strftime("%M:%S.%f"))
print(hash, tasks_order, datetime.now().strftime("%M:%S.%f"))
except Exception as e:
conn.rollback()
raise e
else:
return {"id": new_task_id}
dbm:
def getTasksOrderList():
cur = db.getConn().cursor()
row = cur.execute("SELECT order_list FROM tasks_order").fetchone()
order_list_str = row["order_list"]
return json.loads(order_list_str)
def updateTasksOrderList(order_list):
order_list_str = json.dumps(order_list)
cur = db.getConn().cursor()
cur.execute("UPDATE tasks_order SET order_list = ?", (order_list_str,))
请注意,我是 addid
hash
和一些仅用于调试方式的打印。deleteTask
- 接收任务 id,从数据库中选择 id 数组,删除此任务 id,然后更新数据库。这是它的代码:
端点:
@tasks_bp.route("/deleteTask", methods=["POST"])
def deleteTask():
try:
hash = ''.join(secrets.choice(string.ascii_letters + string.digits + string.punctuation) for _ in range(10))
data = request.get_json()
return tasksService.deleteTask(hash, data["task_id"])
except Exception as e:
print(hash, "ERROR OCCURED", data["task_id"])
服务:
def deleteTask(hash, task_id):
print(hash, "todelete", task_id, datetime.now().strftime("%M:%S.%f"))
try:
conn = db.getConn()
tasks_order = dbm.getTasksOrderList()
print(hash, tasks_order, task_id, datetime.now().strftime("%M:%S.%f"))
tasks_order.remove(task_id)
dbm.updateTasksOrderList(tasks_order)
conn.commit()
except Exception as e:
conn.rollback()
raise e
else:
return {}, 204
我按照 Flask 文档中的建议处理与数据库的连接。
def getConn():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(DATABASE)
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
运行服务后,我在 postman 中运行性能测试,它从 20 个用户向
addTask
和 deleteTask
发送请求。它成功处理了一些添加和删除请求,然后失败了。deleteTask
端点失败。我在日志中看到的奇怪的事情是,在调用 deleteTask
之前发生了对数组(数据库)的插入,而且我实际上能够在日志中看到这一点。 日志:
?,PXYJ*pse added 237 02:51.805480
?,PXYJ*pse [54, 950, 86, 678, 417, 404, 237] 02:51.805480 # here we can see that on 02:51.805480 when I selected the array from the database, it included 237
t"qu5NI+bb todelete 237 02:51.903749
t"qu5NI+bb [54, 950, 86, 678, 417, 151] 237 02:51.905749 # here we can see that AFTER 02:51.805480 when I selected the array from the database, it doesn't include 237
t"qu5NI+bb ERROR OCCURED 237
您的代码存在竞争条件错误 - 如果两个请求同时更新数据库,您将丢失一些数据。
考虑这个基本的并发图(时间向下运行),其中
a
的写入完全丢失,因为请求 2 不知道它发生了。
假设“获取列表”表示“SELECT + 解析 JSON”,“添加到列表”是 Python 追加操作,“保存列表”是“转储 JSON 和 UPDATE 行”。
请求1 | 请求2 |
---|---|
获取列表->[] | |
添加一个到列表 -> [a] | 获取列表->[] |
保存列表[a] | 将 b 添加到列表 -> [b] |
保存列表[b] |
如果您想在当前的数据库设计中避免这种情况,则需要
SELECT order_list FROM tasks_order FOR UPDATE
(因此 SELECT FOR UPDATE
锁定行),并在同一事务中更新行。
也就是说,拥有一个单行单列的数据库表无论如何都是一种反模式。看起来你真的想要一个
CREATE TABLE tasks (id INTEGER PRIMARY KEY);
– 然后添加一个项目只是一个 INSERT INTO tasks (id) VALUES (123);
,删除是 DELETE FROM tasks WHERE id = 123;
等等,而且你根本不需要使用 JSON,而且你也不会'也不需要锁。
如果
tasks
的插入顺序很重要,您可以添加一个由数据库自动处理的时间戳列:
sqlite> CREATE TABLE tasks (id INTEGER PRIMARY KEY, insertion_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
sqlite> INSERT INTO tasks (id) VALUES (831);
sqlite> INSERT INTO tasks (id) VALUES (117);
sqlite> SELECT id FROM tasks ORDER by insertion_time;
831
117
sqlite> SELECT id FROM tasks ORDER by id;
117
831