Concurrent.futures和SQLAlchemy基准测试与同步代码

问题描述 投票:-1回答:1

我有一个项目,需要在我的烧瓶应用程序中上传约70个文件。我现在正在学习并发,所以这似乎是完美的实践。使用打印语句时,此功能的并发版本比同步功能快约2倍至2.5倍。

尽管实际上是在写入SQLite数据库时,它花费的时间大致相同。

原始功能:

@app.route('/test_sync')
def auto_add():

    t0 = time.time()

    # Code does not work without changing directory. better option?
    os.chdir('my_app/static/tracks')

    list_dir = os.listdir('my_app/static/tracks')

    # list_dir consists of .mp3 and .jpg files
    for filename in list_dir:
        if filename.endswith('.mp3'):
            try:
                thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]
            except Exception:
                print(f'ERROR - COULD NOT FIND THUMB for { filename }')

            resize_image(thumbnail)

            with open(filename, 'rb') as f, open(thumbnail, 'rb') as t:

                track = Track(

                    title=filename[15:-4], 
                    artist='Sam Gellaitry',
                    description='No desc.', 
                    thumbnail=t.read(),
                    binary_audio=f.read()
                )

        else:
            continue


        db.session.add(track)

    db.session.commit()
    elapsed = time.time() - t0

    return f'Uploaded all tracks in {elapsed} seconds.'

并发函数:

@app.route('/test_concurrent')
def auto_add_concurrent():

    t0 = time.time()
    MAX_WORKERS = 40

    os.chdir('/my_app/static/tracks')
    list_dir = os.listdir('/my_app/static/tracks')
    mp3_list = [x for x in list_dir if x.endswith('.mp3')]

    with futures.ThreadPoolExecutor(MAX_WORKERS) as executor:
        res = executor.map(add_one_file, mp3_list)

    for x in res:
        db.session.add(x)

    db.session.commit()
    elapsed = time.time() - t0

    return f'Uploaded all tracks in {elapsed} seconds.'

----- 

def add_one_file(filename):

    list_dir = os.listdir('/my_app/static/tracks')

    try:
        thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]

    except Exception:
        print(f'ERROR - COULD NOT FIND THUMB for { filename }')

    resize_image(thumbnail)

    with open(filename, 'rb') as f, open(thumbnail, 'rb') as t:

        track = Track(

            title=filename[15:-4], 
            artist='Sam Gellaitry',
            description='No desc.', 
            thumbnail=t.read(),
            binary_audio=f.read()
        )

    return track

此处使用resize_image函数以确保完整性

def resize_image(thumbnail):

    with Image.open(thumbnail) as img:
        img.resize((500, 500))
        img.save(thumbnail)

    return thumbnail

和基准:

/test_concurrent (with print statements)
Uploaded all tracks in 0.7054300308227539 seconds.

/test_sync
Uploaded all tracks in 1.8661110401153564 seconds.

------
/test_concurrent (with db.session.add/db.session.commit)
Uploaded all tracks in 5.303245782852173 seconds.

/test_sync 
Uploaded all tracks in 6.123792886734009 seconds.

此并发代码在做什么,如何优化它?

python sqlite sqlalchemy concurrent.futures
1个回答
0
投票

似乎DB写入控制着您的时序,当将许多行写入同一表时,它们通常不会从并行化中受益。代替将ORM对象一对一地添加到会话中,而是执行批量插入:

db.session.bulk_save_objects(res)

在您当前的代码中,ORM必须在提交之前的一次刷新期间一次插入Track对象,以便在插入后获取其主键。 Session.bulk_save_objects默认情况下不执行此操作,这意味着对象在使用之后(例如,未添加到会话中)的可用性降低了,但是在您的情况下似乎不成问题。

Session.bulk_save_objects是该主题的不错阅读。

© www.soinside.com 2019 - 2024. All rights reserved.