python + Thread\TrocessPoolExecutor

问题描述 投票:0回答:1

我是并发期货的新手。下面的脚本有问题吗?

def read_excel(self):
    '''
    read excel -> parse df -> store into sql
    '''

    file_list = ['分诊咨询记录','客户消费明细表','电话网络情况明细表']
    tpool = ThreadPoolExecutor(max_workers=3)
    ppool = ProcessPoolExecutor(max_workers=3)
    tfutures = list()
    pfutures = list()
    write_futures = list()
    for file in file_list:
        path = os.path.join(self.config['his']['root'],file)
        if os.path.exists(path):
            tfutures.append(tpool.submit(self._read_excel,path,file)) 
    for tfuture in as_completed(tfutures):
        pfutures.append(ppool.submit(self._parse_df,tfuture.result()))
    for pfuture in as_completed(pfutures):
        result = pfuture.result()
        write_futures.append(tpool.submit(self.update_data,result[1],result[0]))
    for _ in as_completed(write_futures):pass
    tpool.shutdown()
    ppool.shutdown()

for循环会阻塞代码吗?我的意思是,当第一个read_excel完成时,第一个parse_df会开始工作吗?如果 read_excel 池没有完成所有任务,并且第一个 parse_df 完成了怎么办?第一个 update_data 会开始工作吗?

希望剧本不会被封。任务完成后,应立即开始下一步。

  1. 脚本会导致问题吗?
  2. 脚本是否达到我的目标?如何实现我的目标?
python multithreading multiprocessing
1个回答
0
投票

是的,

as_completed
函数返回一个迭代器,在完成时生成futures。第二个和第三个
for
循环将阻塞,直到前一个
for
循环中提交的所有任务完成。如果我理解正确的话,您希望任务完成后立即开始下一步。

您可以使用单个

ThreadPoolExecutor
并使用
Future.add_done_callback
方法链接任务:

from concurrent.futures import ThreadPoolExecutor
import os
def read_excel(self):
    file_list = ['分诊咨询记录','客户消费明细表','电话网络情况明细表']
    pool = ThreadPoolExecutor(max_workers=3)
    def on_read_done(future):
        df, file = future.result()
        parse_future = pool.submit(self._parse_df, df)
        parse_future.add_done_callback(on_parse_done)
    def on_parse_done(future):
        parsed_data, file = future.result()
        updated_future = pool.submit(self.update_data, parsed_data, file)
    for file in file_list:
        path = os.path.join(self.config['his']['root'], file)
        if os.path.exists(path):
            read_future = pool.submit(self._read_excel, path, file)
            read_future.add_done_callback(on_read_done)
    pool.shutdown()
© www.soinside.com 2019 - 2024. All rights reserved.