如何使用所有CPU对大量文件进行子处理?

问题描述 投票:2回答:1

我需要在命令行中使用LaTeXML库将86,000个TEX文件转换为XML。我尝试使用subprocess模块编写一个Python脚本来自动执行此操作,利用所有4个内核。

def get_outpath(tex_path):
    path_parts = pathlib.Path(tex_path).parts
    arxiv_id = path_parts[2]
    outpath = 'xml/' + arxiv_id + '.xml'
    return outpath

def convert_to_xml(inpath):
    outpath = get_outpath(inpath)

    if os.path.isfile(outpath):
        message = '{}: Already converted.'.format(inpath)
        print(message)
        return

    try:
        process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                                   stderr=subprocess.PIPE, 
                                   stdout=subprocess.PIPE)
    except Exception as error:
        process.kill()
        message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
        print(message)

    message = '{}: Converted!'.format(inpath)
    print(message)

def start():
    start_time = time.time()
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(),
                               maxtasksperchild=1)
    print('Initialized {} threads'.format(multiprocessing.cpu_count()))
    print('Beginning conversion...')
    for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5): 
        pass
    pool.close()
    pool.join()
    print("TIME: {}".format(total_time))

start()

该脚本导致Too many open files并减慢我的计算机速度。通过查看Activity Monitor,看起来这个脚本试图一次创建86,000个转换子进程,并且每个进程都试图打开一个文件。也许这是pool.imap_unordered(convert_to_xml, preprints)的结果 - 也许我不需要将地图与subprocess.Popen结合使用,因为我只有太多的命令可以调用?什么是另类?

我花了一整天时间试图找出正确处理批量子处理的方法。我是Python的这一部分的新手,所以任何有关正确方向前进的提示都会非常感激。谢谢!

python multiprocessing subprocess pool python-3.7
1个回答
3
投票

convert_to_xml中,process = subprocess.Popen(...)语句产生了一个latexml子过程。如果没有像process.communicate()这样的阻挡调用,convert_to_xml即使在latexml继续在后台运行时也会结束。

由于convert_to_xml结束,Pool会向关联的工作进程发送另一个要运行的任务,因此再次调用convert_to_xml。再次在后台生成另一个latexml进程。很快,您就可以了解latexml进程中的眼球,并且达到了打开文件数量的资源限制。

修复很简单:添加process.communicate()告诉convert_to_xml等到latexml进程完成。

try:
    process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                               stderr=subprocess.PIPE, 
                               stdout=subprocess.PIPE)
    process.communicate()                                   
except Exception as error:
    process.kill()
    message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
    print(message)

else: # use else so that this won't run if there is an Exception
    message = '{}: Converted!'.format(inpath)
    print(message)

关于if __name__ == '__main__'

作为martineau pointed out,有一个warning in the multiprocessing docs,生成新进程的代码不应该在模块的顶层调用。相反,代码应该包含在if __name__ == '__main__'语句中。

在Linux中,如果你忽视这个警告,就不会发生任何可怕的事情。但在Windows中,代码“fork-bombs”。或者更准确地说,代码会导致产生一系列未经过处理的子进程,因为在Windows上,fork是通过生成一个新的Python进程来模拟的,然后导入调用脚本。每次导入都会产生一个新的Python进程。每个Python进程都尝试导入调用脚本。在消耗所有资源之前,循环不会中断。

所以,为了对我们的Windows-fork-bereft兄弟们好一点,请使用

if __name__ == '__main__:
    start()

有时进程需要大量内存。 The only reliable way释放内存就是终止进程。 maxtasksperchild=1告诉pool在完成1个任务后终止每个工作进程。然后它会产生一个新的工作进程来处理另一个任务(如果有的话)。这释放了原始工作者可能已分配的(内存)资源,否则这些资源无法被释放。

在你的情况下,它看起来不像工作进程需要很多内存,所以你可能不需要maxtasksperchild=1。在convert_to_xml中,process = subprocess.Popen(...)语句产生了一个latexml子过程。如果没有像process.communicate()这样的阻挡调用,convert_to_xml即使在latexml继续在后台运行时也会结束。

由于convert_to_xml结束,Pool会向关联的工作进程发送另一个要运行的任务,因此再次调用convert_to_xml。再次在后台生成另一个latexml进程。很快,您就可以了解latexml进程中的眼球,并且达到了打开文件数量的资源限制。

修复很简单:添加process.communicate()告诉convert_to_xml等到latexml进程完成。

try:
    process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                               stderr=subprocess.PIPE, 
                               stdout=subprocess.PIPE)
    process.communicate()                                   
except Exception as error:
    process.kill()
    message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
    print(message)

else: # use else so that this won't run if there is an Exception
    message = '{}: Converted!'.format(inpath)
    print(message)

chunksize会影响工作人员在将结果发送回主进程之前执行的任务数。 Sometimes这会影响性能,特别是如果进程间通信是整个运行时的重要部分。

在你的情况下,convert_to_xml需要相当长的时间(假设我们等到latexml完成),它只是返回None。因此,进程间通信可能不是整个运行时的重要部分。因此,我不认为在这种情况下您会发现性能发生重大变化(尽管实验从未受到伤害!)。


在普通的Python中,map不应仅用于多次调用函数。

出于类似的风格原因,我会保留使用pool.*map*方法来处理我关心返回值的情况。

而不是

for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5): 
    pass

你可以考虑使用

for preprint in preprints:
    pool.apply_async(convert_to_xml, args=(preprint, ))

代替。


传递给任何pool.*map*函数的iterable立即被消耗。迭代是否是迭代器并不重要。在这里使用迭代器没有特殊的内存优势。 imap_unordered返回一个迭代器,但它不以任何特别是迭代器友好的方式处理它的输入。

无论你传递什么类型的iterable,在调用pool.*map*函数时,iterable都被消耗并转换成放入任务队列的任务。

以下是证实此声明的代码:

version1.朋友:

import multiprocessing as mp
import time

def foo(x):
    time.sleep(0.1)
    return x * x


def gen():
    for x in range(1000):
        if x % 100 == 0:
            print('Got here')
        yield x


def start():
    pool = mp.Pool()
    for item in pool.imap_unordered(foo, gen()):
        pass

    pool.close()
    pool.join()

if __name__ == '__main__':
    start()

version2.朋友:

import multiprocessing as mp
import time
def foo(x):
    time.sleep(0.1)
    return x * x


def gen():
    for x in range(1000):
        if x % 100 == 0:
            print('Got here')
        yield x


def start():
    pool = mp.Pool()

    for item in gen():
        result = pool.apply_async(foo, args=(item, ))

    pool.close()
    pool.join()

if __name__ == '__main__':
    start()

运行version1.pyversion2.py都会产生相同的结果。

Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here

至关重要的是,您会注意到Got here在运行开始时非常快速地打印了10次,然后在程序结束之前有一个很长的暂停(计算完成时)。

如果生成器gen()以某种方式被pool.imap_unordered缓慢消耗,我们应该期望Got here也能慢慢打印。由于Got here被打印10次并且很快,我们可以看到可迭代的gen()在完成任务之前已经被完全消耗掉了。

运行这些程序应该让你有信心pool.imap_unorderedpool.apply_async基本上以相同的方式将任务放入队列中:在调用之后立即执行。

© www.soinside.com 2019 - 2024. All rights reserved.