ProcessPoolExecutor 可以与 Python 中的产量生成器一起使用吗?

问题描述 投票:0回答:1

我有一个 python 脚本,旨在处理一些大文件并将结果写入新的 txt 文件中。我将其简化为代码示例 1。 代码示例1:

from concurrent.futures import ProcessPoolExecutor
import pandas as pd
from collections import OrderedDict
import os

def process(args):
    parm1, parm2, parm3, output_file = args
    scores = []
    # do something with scores...
    result = '\t'.join(scores)
    output_file.write(result)

def main(large_file_path, output_path, max_processes):
    #do something ...
    with open(large_file_path, 'r') as large_file, open(output_path, 'w') as output_file:
        arg_list = []
        parm1, parm2, parm3 = '1', 0, 0
        for line in vcf_file:
            if line.startswith('#'):
                continue
            else:
                #do something to update parm1, parm2, parm3...
                arg_list.append((parm1, parm2, parm3, output_file))
        with ProcessPoolExecutor(max_processes) as executor:
            executor.map(process, arg_list, chunksize=int(max_processes/2))

if __name__ == "__main__":
    large_path = "/path/to/large_file"
    output_path = f"para_scores.txt"
    max_processes = int(os.cpu_count()/2)# Set the maximum number of processes

    main(large_path, output_path, max_processes)

我意识到,如果

arg_list
很大,那么
large_file
可能会很大。我不确定是否有足够的可用内存。然后我尝试使用yield生成器而不是仅仅使用Python列表作为代码示例2,它运行正常但不生成任何东西。 代码示例2:

from concurrent.futures import ProcessPoolExecutor
import pandas as pd
from collections import OrderedDict
import os

def process(args):
    parm1, parm2, parm3, output_file = args
    scores = []
    # do something with scores...
    result = '\t'.join(scores)
    output_file.write(result)

def main(large_file_path, output_path, max_processes):
    #do something ...
    with open(large_file_path, 'r') as large_file, open(output_path, 'w') as output_file:
        def arg_generator(large_file, output_file):
            parm1, parm2, parm3 = '1', 0, 0
            for line in vcf_file:
                if line.startswith('#'):
                    continue
                else:
                    #do something to update parm1, parm2, parm3...
                    yield (parm1, parm2, parm3, output_file)
        with ProcessPoolExecutor(max_processes) as executor:
            executor.map(process, arg_generator(large_file, output_file), chunksize=int(max_processes/2))

if __name__ == "__main__":
    large_path = "/path/to/large_file"
    output_path = f"para_scores.txt"
    max_processes = int(os.cpu_count()/2)# Set the maximum number of processes

    main(large_path, output_path, max_processes)

我在 ubuntu 20.04.6 LTS 服务器、python 3.9.18 上运行代码。

那么

ProcessPoolExecutor
可以与 Python 中的
yield
生成器一起使用吗?或者
executor.map
的使用有问题?我应该怎么做才能让它发挥作用?

python linux generator large-files concurrent.futures
1个回答
0
投票

是的,可以。这是一个例子:

from concurrent.futures import ProcessPoolExecutor


def process(x):
    print(x * 2)


def gen():
    for i in range(3):
        yield i


def main():
    print('starting')
    with ProcessPoolExecutor() as executor:
        executor.map(process, gen())
    print('done')


main()

输出:

starting
0
2
4
done

进程不知道它们的参数来自哪里,它们接收单个项目,而不是整个生成器。使用发电机没有什么区别。

但是,尝试传递打开的文件句柄似乎确实会奇怪地破坏事情:

from concurrent.futures import ProcessPoolExecutor


def process(f):
    print('processing', f)


def main():
    print('starting')
    with open('path_to_file.txt') as f:
        with ProcessPoolExecutor() as executor:
            executor.map(process, [f])
    print('done')


main()

输出缺少“处理”:

starting
done

而是将文件名作为字符串传递给每个进程。为每个进程使用不同的文件名,这样它们就不会互相覆盖。

© www.soinside.com 2019 - 2024. All rights reserved.