我想在 Jupyter 笔记本中并行执行长时间运行的函数,而不阻塞笔记本本身。但是,我还想随时将完全不相关的作业添加到队列中,即使队列仍在处理以前的作业。基本上,这类似于用于计算集群的作业调度程序的极其简单的版本。
我目前的实现是这样的。首先创建一个工作池和一个作业队列。
import multiprocessing
def worker(job_queue, result_queue):
while True:
job = job_queue.get()
function, args, kwargs = job
result = function(*args, **kwargs)
result_queue.put(result)
job_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
pool = multiprocessing.Pool(processes=2, initializer=worker, initargs=[job_queue, result_queue])
后来,在一个单独的单元格中,我们定义了一个需要很长时间才能计算的函数,因此我们希望在不阻塞笔记本的情况下运行它
def slow_function(x):
import time
time.sleep(3)
return x+1
然后我们将多个作业发送到队列以在后台并行执行
job_queue.put([slow_function, [1], {}])
job_queue.put([slow_function, [2], {}])
然后,当之前的作业仍在运行时,我们定义另一个函数,并将其也添加到队列中
def complex_function(x):
return slow_function(x)*2
job_queue.put([complex_function, [1], {}])
job_queue.put([complex_function, [2], {}])
我们继续在Jupyter笔记本中运行其他东西一段时间,然后在所有作业完成后,我们得到结果
result1 = result_queue.get(False)
result2 = result_queue.get(False)
result3 = result_queue.get(False)
result4 = result_queue.get(False)
这是一般工作流程,我希望能够做到。
按照实施方式,创建池后,工作进程立即崩溃并在无限循环中重新启动,并出现 pickle 错误
AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\yy51\AppData\Local\Programs\Miniconda3\Lib\multiprocessing\spawn.py", line 122, in spawn_main
exitcode = _main(fd, parent_sentinel)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\yy51\AppData\Local\Programs\Miniconda3\Lib\multiprocessing\spawn.py", line 132, in _main
self = reduction.pickle.load(from_parent)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
如果我切换到使用
pathos
使用 pool = pathos.multiprocessing.Pool(processes=2, initializer=worker, initargs=[job_queue, result_queue])
,那么我会得到 RuntimeError: Queue objects should only be shared between processes through inheritance
如果我将
multiprocessing.Queue
替换为 pathos
队列 pathos.helpers.mp.Queue
,我就可以让它工作了
job_queue = pathos.helpers.mp.Queue()
result_queue = pathos.helpers.mp.Queue()
但是如果我尝试在
complex_function
中调用另一个定义的函数,它仍然不起作用
job_queue.put([complex_function, [1], {}])
NameError: name 'slow_function' is not defined
我也真的、真的不想创建一个单独的文件来在模块中托管
slow_function
或 complex_function
,因为这对于交互式笔记本使用来说非常不方便。
注意:我在 Windows 上运行此程序。
玩过一次,这就是我的评论:
import sys
import asyncio
import concurrent.futures
import ipywidgets
threadpool = concurrent.futures.ThreadPoolExecutor(4)
def run(fn, *args, **kwds):
"run fn in threadpool"
out = ipywidgets.Output()
def print(*args, file=sys.stdout):
line = ' '.join(map(str, args)) + '\n'
if file is sys.stderr:
out.append_stderr(line)
else:
out.append_stdout(line)
def done(fut: asyncio.Future):
try:
result = fut.result()
except asyncio.CancelledError:
print("cancelled", fut, file=sys.stderr)
except Exception as err:
print("failed", fut, file=sys.stderr)
else:
print("completed", fut)
async def go():
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
threadpool,
lambda: fn(print, *args, **kwds),
)
task = asyncio.create_task(go())
task.add_done_callback(done)
return out
这个想法是用一些非异步/阻塞代码调用
run
,例如:
import time
def cpu_bound(print, dt, fail=False):
for i in range(10):
time.sleep(dt)
print(i, time.time())
if fail:
1 / 0
return "done"
run(cpu_bound, 0.1)
进来的
print
是为了让输出能够到达正确的位置,Output
小部件似乎依赖于 asyncio 魔法来让“真正的”print
到达正确的位置。它添加了一个完成回调,以便它可以在完成时输出一些内容,就像失败的方法一样:
run(cpu_bound, 0.5, fail=True)
每个单元中都有一些运行
run(cpu_bound, 1)
的单元,看起来很整洁。输出只是在处理时累积,希望这与您的预期有些相似!