为 Jupyter Notebook 实现并行评估队列

问题描述 投票:0回答:1

我想在 Jupyter 笔记本中并行执行长时间运行的函数,而不阻塞笔记本本身。但是,我还想随时将完全不相关的作业添加到队列中,即使队列仍在处理以前的作业。基本上,这类似于用于计算集群的作业调度程序的极其简单的版本。

我目前的实现是这样的。首先创建一个工作池和一个作业队列。

import multiprocessing
def worker(job_queue, result_queue):
    while True:
        job = job_queue.get()
        function, args, kwargs = job
        result = function(*args, **kwargs)
        result_queue.put(result)
job_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
pool = multiprocessing.Pool(processes=2, initializer=worker, initargs=[job_queue, result_queue])

后来,在一个单独的单元格中,我们定义了一个需要很长时间才能计算的函数,因此我们希望在不阻塞笔记本的情况下运行它

def slow_function(x):
    import time
    time.sleep(3)
    return x+1

然后我们将多个作业发送到队列以在后台并行执行

job_queue.put([slow_function, [1], {}])
job_queue.put([slow_function, [2], {}])

然后,当之前的作业仍在运行时,我们定义另一个函数,并将其也添加到队列中

def complex_function(x):
    return slow_function(x)*2
job_queue.put([complex_function, [1], {}])
job_queue.put([complex_function, [2], {}])

我们继续在Jupyter笔记本中运行其他东西一段时间,然后在所有作业完成后,我们得到结果

result1 = result_queue.get(False)
result2 = result_queue.get(False)
result3 = result_queue.get(False)
result4 = result_queue.get(False)

这是一般工作流程,我希望能够做到。

问题

按照实施方式,创建池后,工作进程立即崩溃并在无限循环中重新启动,并出现 pickle 错误

AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\yy51\AppData\Local\Programs\Miniconda3\Lib\multiprocessing\spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\yy51\AppData\Local\Programs\Miniconda3\Lib\multiprocessing\spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

如果我切换到使用

pathos
使用
pool = pathos.multiprocessing.Pool(processes=2, initializer=worker, initargs=[job_queue, result_queue])
,那么我会得到
RuntimeError: Queue objects should only be shared between processes through inheritance

如果我将

multiprocessing.Queue
替换为
pathos
队列
pathos.helpers.mp.Queue

,我就可以让它工作了
job_queue = pathos.helpers.mp.Queue()
result_queue = pathos.helpers.mp.Queue()

但是如果我尝试在

complex_function

中调用另一个定义的函数,它仍然不起作用
job_queue.put([complex_function, [1], {}])

NameError: name 'slow_function' is not defined

限制

我也真的、真的不想创建一个单独的文件来在模块中托管

slow_function
complex_function
,因为这对于交互式笔记本使用来说非常不方便。

注意:我在 Windows 上运行此程序。

python jupyter-notebook pickle python-multiprocessing
1个回答
0
投票

玩过一次,这就是我的评论:

import sys
import asyncio
import concurrent.futures

import ipywidgets

threadpool = concurrent.futures.ThreadPoolExecutor(4)

def run(fn, *args, **kwds):
    "run fn in threadpool"
    out = ipywidgets.Output()

    def print(*args, file=sys.stdout):
        line = ' '.join(map(str, args)) + '\n'
        if file is sys.stderr:
            out.append_stderr(line)
        else:
            out.append_stdout(line)

    def done(fut: asyncio.Future):
        try:
            result = fut.result()
        except asyncio.CancelledError:
            print("cancelled", fut, file=sys.stderr)
        except Exception as err:
            print("failed", fut, file=sys.stderr)
        else:
            print("completed", fut)

    async def go():
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            threadpool,
            lambda: fn(print, *args, **kwds),
        )

    task = asyncio.create_task(go())
    task.add_done_callback(done)

    return out

这个想法是用一些非异步/阻塞代码调用

run
,例如:

import time

def cpu_bound(print, dt, fail=False):
    for i in range(10):
        time.sleep(dt)
        print(i, time.time())
    if fail:
        1 / 0
    return "done"

run(cpu_bound, 0.1)

进来的

print
是为了让输出能够到达正确的位置,
Output
小部件似乎依赖于 asyncio 魔法来让“真正的”
print
到达正确的位置。它添加了一个完成回调,以便它可以在完成时输出一些内容,就像失败的方法一样:

run(cpu_bound, 0.5, fail=True)

每个单元中都有一些运行

run(cpu_bound, 1)
的单元,看起来很整洁。输出只是在处理时累积,希望这与您的预期有些相似!

© www.soinside.com 2019 - 2024. All rights reserved.