我是并发期货的新手。下面的脚本有问题吗?
def read_excel(self):
'''
read excel -> parse df -> store into sql
'''
file_list = ['分诊咨询记录','客户消费明细表','电话网络情况明细表']
tpool = ThreadPoolExecutor(max_workers=3)
ppool = ProcessPoolExecutor(max_workers=3)
tfutures = list()
pfutures = list()
write_futures = list()
for file in file_list:
path = os.path.join(self.config['his']['root'],file)
if os.path.exists(path):
tfutures.append(tpool.submit(self._read_excel,path,file))
for tfuture in as_completed(tfutures):
pfutures.append(ppool.submit(self._parse_df,tfuture.result()))
for pfuture in as_completed(pfutures):
result = pfuture.result()
write_futures.append(tpool.submit(self.update_data,result[1],result[0]))
for _ in as_completed(write_futures):pass
tpool.shutdown()
ppool.shutdown()
for循环会阻塞代码吗?我的意思是,当第一个read_excel完成时,第一个parse_df会开始工作吗?如果 read_excel 池没有完成所有任务,并且第一个 parse_df 完成了怎么办?第一个 update_data 会开始工作吗?
希望剧本不会被封。任务完成后,应立即开始下一步。
是的,
as_completed
函数返回一个迭代器,在完成时生成futures。第二个和第三个 for
循环将阻塞,直到前一个 for
循环中提交的所有任务完成。如果我理解正确的话,您希望任务完成后立即开始下一步。
您可以使用单个
ThreadPoolExecutor
并使用 Future.add_done_callback
方法链接任务:
from concurrent.futures import ThreadPoolExecutor
import os
def read_excel(self):
file_list = ['分诊咨询记录','客户消费明细表','电话网络情况明细表']
pool = ThreadPoolExecutor(max_workers=3)
def on_read_done(future):
df, file = future.result()
parse_future = pool.submit(self._parse_df, df)
parse_future.add_done_callback(on_parse_done)
def on_parse_done(future):
parsed_data, file = future.result()
updated_future = pool.submit(self.update_data, parsed_data, file)
for file in file_list:
path = os.path.join(self.config['his']['root'], file)
if os.path.exists(path):
read_future = pool.submit(self._read_excel, path, file)
read_future.add_done_callback(on_read_done)
pool.shutdown()