我有两个示例模型,如图所示。
class User(db.Model):
id = db.Column(....)
name = db.Column(....)
....
class Transaction(db.Model)
id = db.Column(....)
user_id = db.Column(ForeignKey('User.id').....)
....
我有一个将 user_id 作为参数的函数。
def add_transaction_for_user(user_id: int):
'''Does some processing, finds out if transactions
are missing and adds them if needed.
'''
# The part of the question the needs answering Q1 #
instance_foo = query_foo_for_user(user_id) # DB operations querying
missing_transactions = find_transactions_from_foo(foo.property) # non DB operations
if missing_transactions:
add_transactions(missing_transactions) # DB operations inserting
....
....
我有一个每天由 cronjob 调用的 API 路由。
@some_blueprint.route('/transactions/autoadd', methods=['POST'])
def autoadd():
'''Searches and adds missing transactions.
'''
all_user_ids: list[int] = query_all_users() # [1, 2, 3...]
# The part of the question the needs answering Q2 #
我有 1000 多个用户,按顺序执行此操作将花费数小时,这就是为什么我需要使用多线程来优化工作流程,因为每个用户都是独立的。 我有几个问题。
Q1。我可以简单地创建
session = Session()
一个 scoped_session 并将它传递给每个标有 #DB operations
的函数吗?因为我不能将所有底层代码放在一个函数中,所以我创建了一些小函数来完成特定的工作,但是没有一个示例作为参数传入会话,我是否必须在这个函数内部或外部调用 Session.remove()
一旦工作完成?
Q2。我知道我必须使用像
这样的东西executor = ThreadPoolExecutor(max_workers=4)
for user_id in all_user_ids:
executor.submit(auto_add_transaction_for_user, user_id=user_id)
但是我应该在路由中创建一个会话并为每个 user_id 传递
executor.submit(...., session=s,....)
吗?或者让每个线程创建它自己的?
Q3。在此上下文中使用多线程的最简单示例是什么?
提前感谢所有帮助,非常感谢。