我正在尝试模拟Python方法目标的
subprocess.Popen stdout / stderr
,这意味着我想创建N个并行作业,并具有独特的stdout / stderr处理程序。
这是我当前的代码:
#!/bin/python
import time
import multiprocessing as mp
class Job(object):
def __init__(self, target, *a, **kw):
self.target = target
self.args = a
self.kwargs = kw
def parallelize(jobs):
"""
Args:
jobs (list): list of the jobs to run. with all its params
"""
procs = [mp.Process(target=job.target, args=job.args, kwargs=job.kwargs) for job in jobs]
for p in procs:
p.start()
for p in procs:
p.join()
def dummy(*a, **kw):
print a
time.sleep(1)
print kw
def main():
parallelize([Job(dummy) for i in range(2)])
main()
仅并行化作业。输出仍然显示在屏幕上。如果我使用
subprocess.Popen()
,我可以为每个进程创建添加 stdout=PIPE()
参数,并将其值存储在一个对象中,稍后从 parallelize()
返回,但我不知道如何实现。
multiprocessing提供了一些选项,例如使用
conn
,但这没有帮助,因为我正在运行黑盒方法,并且我无法覆盖它们并使它们打印发送到conn
而不是打印。
This页面建议捕获stdout,但我认为这不好,因为并行进程会输出到同一个地方,没有分离能力...
如何通过 IO 控制实现并行进程(使用 Python 方法目标)?
我想到了类似
subprocess.Popen('python -c "import my_module; my_module.my_method()", stdout=subprocess.PIPE)
的东西,但我认为它太老套了。
contextlib.redirect_stdout
上下文管理器中(如果需要,还可以使用 contextlib.redirect_stderr
):
import time
import multiprocessing as mp
from contextlib import redirect_stdout
class Job(object):
def __init__(self, target, number, *a, **kw):
self.target = target
self.number = number
self.args = a
self.kwargs = kw
def run(self):
with open(f'output_{self.number}.log', 'w') as file, redirect_stdout(file):
return self.target(*self.args, **self.kwargs)
def parallelize(jobs):
procs = [mp.Process(target=job.run) for job in jobs]
for p in procs:
p.start()
for p in procs:
p.join()
def dummy(*a, **kw):
print(a)
time.sleep(1)
print(kw)
def main():
parallelize([Job(dummy, i, f'foo{i}', bar=i) for i in range(2)])
if __name__ == '__main__':
main()
这将生成以下两个文件:
# output_0.log
('foo0',)
{'bar': 0}
和:
# output_1.log
('foo1',)
{'bar': 1}