我有一个项目,需要在我的烧瓶应用程序中上传约70个文件。我现在正在学习并发,所以这似乎是完美的实践。使用打印语句时,此功能的并发版本比同步功能快约2倍至2.5倍。
尽管实际上是在写入SQLite数据库时,它花费的时间大致相同。
原始功能:
@app.route('/test_sync')
def auto_add():
t0 = time.time()
# Code does not work without changing directory. better option?
os.chdir('my_app/static/tracks')
list_dir = os.listdir('my_app/static/tracks')
# list_dir consists of .mp3 and .jpg files
for filename in list_dir:
if filename.endswith('.mp3'):
try:
thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]
except Exception:
print(f'ERROR - COULD NOT FIND THUMB for { filename }')
resize_image(thumbnail)
with open(filename, 'rb') as f, open(thumbnail, 'rb') as t:
track = Track(
title=filename[15:-4],
artist='Sam Gellaitry',
description='No desc.',
thumbnail=t.read(),
binary_audio=f.read()
)
else:
continue
db.session.add(track)
db.session.commit()
elapsed = time.time() - t0
return f'Uploaded all tracks in {elapsed} seconds.'
并发函数:
@app.route('/test_concurrent')
def auto_add_concurrent():
t0 = time.time()
MAX_WORKERS = 40
os.chdir('/my_app/static/tracks')
list_dir = os.listdir('/my_app/static/tracks')
mp3_list = [x for x in list_dir if x.endswith('.mp3')]
with futures.ThreadPoolExecutor(MAX_WORKERS) as executor:
res = executor.map(add_one_file, mp3_list)
for x in res:
db.session.add(x)
db.session.commit()
elapsed = time.time() - t0
return f'Uploaded all tracks in {elapsed} seconds.'
-----
def add_one_file(filename):
list_dir = os.listdir('/my_app/static/tracks')
try:
thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]
except Exception:
print(f'ERROR - COULD NOT FIND THUMB for { filename }')
resize_image(thumbnail)
with open(filename, 'rb') as f, open(thumbnail, 'rb') as t:
track = Track(
title=filename[15:-4],
artist='Sam Gellaitry',
description='No desc.',
thumbnail=t.read(),
binary_audio=f.read()
)
return track
此处使用resize_image函数以确保完整性
def resize_image(thumbnail):
with Image.open(thumbnail) as img:
img.resize((500, 500))
img.save(thumbnail)
return thumbnail
和基准:
/test_concurrent (with print statements)
Uploaded all tracks in 0.7054300308227539 seconds.
/test_sync
Uploaded all tracks in 1.8661110401153564 seconds.
------
/test_concurrent (with db.session.add/db.session.commit)
Uploaded all tracks in 5.303245782852173 seconds.
/test_sync
Uploaded all tracks in 6.123792886734009 seconds.
此并发代码在做什么,如何优化它?
似乎DB写入控制着您的时序,当将许多行写入同一表时,它们通常不会从并行化中受益。代替将ORM对象一对一地添加到会话中,而是执行批量插入:
db.session.bulk_save_objects(res)
在您当前的代码中,ORM必须在提交之前的一次刷新期间一次插入Track
对象,以便在插入后获取其主键。 Session.bulk_save_objects
默认情况下不执行此操作,这意味着对象在使用之后(例如,未添加到会话中)的可用性降低了,但是在您的情况下似乎不成问题。
Session.bulk_save_objects
是该主题的不错阅读。