我在服务器上有一个文件夹,该文件夹将全天连续接收一些文件。我需要查看目录,一旦收到文件,就需要对该文件进行一些处理。有时,根据文件大小(最多可达到20 GB),处理可能需要更长的时间。
我正在使用current.futures.ThreadPoolExecutor一次处理多个文件。但是,在了解如何处理以下情况时,我需要一些帮助:-
我一次收到5个文件(4个小文件和1个大文件),ThreadPoolExecutor拾取了全部5个文件进行处理。处理4个小文件需要几秒钟,但是处理大文件则需要20分钟。现在,在处理大文件时,文件夹中还有10个文件正在等待。
我设置了max_workers = 5,但是现在只有一个ThreadPoolExecutor工作程序运行以处理大文件,这阻止了下一组文件的执行。那时候有4个工作者有空,我们如何开始处理其他文件。
import os
import time
import random
import concurrent.futures
import datetime
import functools
def process_file(file1, input_num):
# Do some processing
os.remove(os.path.join('C:\\temp\\abcd',file1))
time.sleep(10)
def main():
print("Start Time is ",datetime.datetime.now())
#It will be a continuous loop which will watch a directory for incoming file
while True:
#Get the list of files in directory
file_list = os.listdir('C:\\temp\\abcd')
print("file_list is", file_list)
input_num = random.randint(1000000000,9999999999)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
process_file_arg = functools.partial(process_file, input_num = input_num)
executor.map(process_file_arg, file_list)
time.sleep(10)
if __name__ == '__main__':
main()
main()函数连续监视目录并调用ThreadPoolExecutor
我遇到了同样的问题,this answer可能会帮助您。
concurrent.futures.wait
将futures返回到命名的2元组集合done
和not_done
中,因此我们可以删除done
部分并将新任务添加到not_done
中使并行作业连续的线程列表,下面是一个示例片段:
thread_list = []
with open(input_filename, 'r') as fp_in:
with concurrent.futures.ThreadPoolExecutor(max_workers=THREAD_LIMIT) as executor:
thread_list.append(executor.submit(your_thread_func, para_list))
if len(thread_list) >= THREAD_LIMIT:
done, not_done = concurrent.futures.wait(thread_list, timeout=1,
return_when=concurrent.futures.FIRST_COMPLETED)
# consume finished
done_res = [i.result() for i in done]
# and keep unfinished
thread_list = list(not_done)