具有连续无界输入的Python ThreadPoolExecutor

问题描述 投票:0回答:1

我在服务器上有一个文件夹,该文件夹将全天连续接收一些文件。我需要查看目录,一旦收到文件,就需要对该文件进行一些处理。有时,根据文件大小(最多可达到20 GB),处理可能需要更长的时间。

我正在使用current.futures.ThreadPoolExecutor一次处理多个文件。但是,在了解如何处理以下情况时,我需要一些帮助:-

我一次收到5个文件(4个小文件和1个大文件),ThreadPoolExecutor拾取了全部5个文件进行处理。处理4个小文件需要几秒钟,但是处理大文件则需要20分钟。现在,在处理大文件时,文件夹中还有10个文件正在等待。

我设置了max_workers = 5,但是现在只有一个ThreadPoolExecutor工作程序运行以处理大文件,这阻止了下一组文件的执行。那时候有4个工作者有空,我们如何开始处理其他文件。


import os
import time
import random
import concurrent.futures
import datetime
import functools

def process_file(file1, input_num):
    # Do some processing
    os.remove(os.path.join('C:\\temp\\abcd',file1))
    time.sleep(10)    

def main():
    print("Start Time is ",datetime.datetime.now())

    #It will be a continuous loop which will watch a directory for incoming file
    while True:
        #Get the list of files in directory
        file_list = os.listdir('C:\\temp\\abcd')
        print("file_list is", file_list)
        input_num = random.randint(1000000000,9999999999)

        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            process_file_arg = functools.partial(process_file, input_num = input_num)
            executor.map(process_file_arg, file_list)

        time.sleep(10)

if __name__ == '__main__':
    main()

main()函数连续监视目录并调用ThreadPoolExecutor

python python-3.x concurrency python-multithreading concurrent.futures
1个回答
0
投票

我遇到了同样的问题,this answer可能会帮助您。

concurrent.futures.waitfutures返回到命名的2元组集合donenot_done中,因此我们可以删除done部分并将新任务添加到not_done中使并行作业连续的线程列表,下面是一个示例片段:

thread_list = []
with open(input_filename, 'r') as fp_in:
    with concurrent.futures.ThreadPoolExecutor(max_workers=THREAD_LIMIT) as executor:
        thread_list.append(executor.submit(your_thread_func, para_list))
        if len(thread_list) >= THREAD_LIMIT:
            done, not_done = concurrent.futures.wait(thread_list, timeout=1,
                                                     return_when=concurrent.futures.FIRST_COMPLETED)
            # consume finished
            done_res = [i.result() for i in done]
            # and keep unfinished
            thread_list = list(not_done)                  
© www.soinside.com 2019 - 2024. All rights reserved.