我需要在命令行中使用LaTeXML库将86,000个TEX文件转换为XML。我尝试使用subprocess
模块编写一个Python脚本来自动执行此操作,利用所有4个内核。
def get_outpath(tex_path):
path_parts = pathlib.Path(tex_path).parts
arxiv_id = path_parts[2]
outpath = 'xml/' + arxiv_id + '.xml'
return outpath
def convert_to_xml(inpath):
outpath = get_outpath(inpath)
if os.path.isfile(outpath):
message = '{}: Already converted.'.format(inpath)
print(message)
return
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
message = '{}: Converted!'.format(inpath)
print(message)
def start():
start_time = time.time()
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(),
maxtasksperchild=1)
print('Initialized {} threads'.format(multiprocessing.cpu_count()))
print('Beginning conversion...')
for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5):
pass
pool.close()
pool.join()
print("TIME: {}".format(total_time))
start()
该脚本导致Too many open files
并减慢我的计算机速度。通过查看Activity Monitor,看起来这个脚本试图一次创建86,000个转换子进程,并且每个进程都试图打开一个文件。也许这是pool.imap_unordered(convert_to_xml, preprints)
的结果 - 也许我不需要将地图与subprocess.Popen
结合使用,因为我只有太多的命令可以调用?什么是另类?
我花了一整天时间试图找出正确处理批量子处理的方法。我是Python的这一部分的新手,所以任何有关正确方向前进的提示都会非常感激。谢谢!
在convert_to_xml
中,process = subprocess.Popen(...)
语句产生了一个latexml
子过程。如果没有像process.communicate()
这样的阻挡调用,convert_to_xml
即使在latexml
继续在后台运行时也会结束。
由于convert_to_xml
结束,Pool会向关联的工作进程发送另一个要运行的任务,因此再次调用convert_to_xml
。再次在后台生成另一个latexml
进程。很快,您就可以了解latexml
进程中的眼球,并且达到了打开文件数量的资源限制。
修复很简单:添加process.communicate()
告诉convert_to_xml
等到latexml
进程完成。
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate()
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)
关于if __name__ == '__main__'
:
作为martineau pointed out,有一个warning in the multiprocessing docs,生成新进程的代码不应该在模块的顶层调用。相反,代码应该包含在if __name__ == '__main__'
语句中。
在Linux中,如果你忽视这个警告,就不会发生任何可怕的事情。但在Windows中,代码“fork-bombs”。或者更准确地说,代码会导致产生一系列未经过处理的子进程,因为在Windows上,fork
是通过生成一个新的Python进程来模拟的,然后导入调用脚本。每次导入都会产生一个新的Python进程。每个Python进程都尝试导入调用脚本。在消耗所有资源之前,循环不会中断。
所以,为了对我们的Windows-fork-bereft兄弟们好一点,请使用
if __name__ == '__main__:
start()
有时进程需要大量内存。 The only reliable way释放内存就是终止进程。 maxtasksperchild=1
告诉pool
在完成1个任务后终止每个工作进程。然后它会产生一个新的工作进程来处理另一个任务(如果有的话)。这释放了原始工作者可能已分配的(内存)资源,否则这些资源无法被释放。
在你的情况下,它看起来不像工作进程需要很多内存,所以你可能不需要maxtasksperchild=1
。在convert_to_xml
中,process = subprocess.Popen(...)
语句产生了一个latexml
子过程。如果没有像process.communicate()
这样的阻挡调用,convert_to_xml
即使在latexml
继续在后台运行时也会结束。
由于convert_to_xml
结束,Pool会向关联的工作进程发送另一个要运行的任务,因此再次调用convert_to_xml
。再次在后台生成另一个latexml
进程。很快,您就可以了解latexml
进程中的眼球,并且达到了打开文件数量的资源限制。
修复很简单:添加process.communicate()
告诉convert_to_xml
等到latexml
进程完成。
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate()
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)
chunksize
会影响工作人员在将结果发送回主进程之前执行的任务数。 Sometimes这会影响性能,特别是如果进程间通信是整个运行时的重要部分。
在你的情况下,convert_to_xml
需要相当长的时间(假设我们等到latexml
完成),它只是返回None
。因此,进程间通信可能不是整个运行时的重要部分。因此,我不认为在这种情况下您会发现性能发生重大变化(尽管实验从未受到伤害!)。
在普通的Python中,map
不应仅用于多次调用函数。
出于类似的风格原因,我会保留使用pool.*map*
方法来处理我关心返回值的情况。
而不是
for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5):
pass
你可以考虑使用
for preprint in preprints:
pool.apply_async(convert_to_xml, args=(preprint, ))
代替。
传递给任何pool.*map*
函数的iterable立即被消耗。迭代是否是迭代器并不重要。在这里使用迭代器没有特殊的内存优势。 imap_unordered
返回一个迭代器,但它不以任何特别是迭代器友好的方式处理它的输入。
无论你传递什么类型的iterable,在调用pool.*map*
函数时,iterable都被消耗并转换成放入任务队列的任务。
以下是证实此声明的代码:
version1.朋友:
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x
def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x
def start():
pool = mp.Pool()
for item in pool.imap_unordered(foo, gen()):
pass
pool.close()
pool.join()
if __name__ == '__main__':
start()
version2.朋友:
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x
def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x
def start():
pool = mp.Pool()
for item in gen():
result = pool.apply_async(foo, args=(item, ))
pool.close()
pool.join()
if __name__ == '__main__':
start()
运行version1.py
和version2.py
都会产生相同的结果。
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
至关重要的是,您会注意到Got here
在运行开始时非常快速地打印了10次,然后在程序结束之前有一个很长的暂停(计算完成时)。
如果生成器gen()
以某种方式被pool.imap_unordered
缓慢消耗,我们应该期望Got here
也能慢慢打印。由于Got here
被打印10次并且很快,我们可以看到可迭代的gen()
在完成任务之前已经被完全消耗掉了。
运行这些程序应该让你有信心pool.imap_unordered
和pool.apply_async
基本上以相同的方式将任务放入队列中:在调用之后立即执行。