使用 sqlalchemy 和 flask 的多线程

问题描述 投票:0回答:0

我有两个示例模型,如图所示。

class User(db.Model):
    id   = db.Column(....)
    name = db.Column(....)
    ....

class Transaction(db.Model)
    id = db.Column(....)
    user_id = db.Column(ForeignKey('User.id').....)
    ....

我有一个将 user_id 作为参数的函数。

def add_transaction_for_user(user_id: int):
    '''Does some processing, finds out if transactions
    are missing and adds them if needed.
    '''
    # The part of the question the needs answering Q1 #

    instance_foo = query_foo_for_user(user_id) # DB operations querying
    missing_transactions = find_transactions_from_foo(foo.property) # non DB operations
    if missing_transactions:
        add_transactions(missing_transactions) # DB operations inserting
    ....
    ....

我有一个每天由 cronjob 调用的 API 路由。

@some_blueprint.route('/transactions/autoadd', methods=['POST'])
def autoadd():
    '''Searches and adds missing transactions.
    '''
    all_user_ids: list[int] = query_all_users() # [1, 2, 3...]

    # The part of the question the needs answering Q2 #

我有 1000 多个用户,按顺序执行此操作将花费数小时,这就是为什么我需要使用多线程来优化工作流程,因为每个用户都是独立的。 我有几个问题。

Q1。我可以简单地创建

session = Session()
一个 scoped_session 并将它传递给每个标有
#DB operations
的函数吗?因为我不能将所有底层代码放在一个函数中,所以我创建了一些小函数来完成特定的工作,但是没有一个示例作为参数传入会话,我是否必须在这个函数内部或外部调用
Session.remove()
一旦工作完成?

Q2。我知道我必须使用像

这样的东西
executor = ThreadPoolExecutor(max_workers=4)
for user_id in all_user_ids:
    executor.submit(auto_add_transaction_for_user, user_id=user_id)

但是我应该在路由中创建一个会话并为每个 user_id 传递

executor.submit(...., session=s,....)
吗?或者让每个线程创建它自己的?

Q3。在此上下文中使用多线程的最简单示例是什么?

提前感谢所有帮助,非常感谢。

python-3.x flask sqlalchemy flask-sqlalchemy python-multithreading
© www.soinside.com 2019 - 2024. All rights reserved.