我有三个函数,使用papermill执行3个不同的jupyter笔记本,我希望第一个函数(job1)和第二个函数(job2)同时运行,最后一个函数(job3)在第一个函数(job1)完成运行后才运行,而不会出现任何错误。我不知道为第二个函数创建一个新的线程是否合理,也不知道如何恰当地使用join()方法。我在Windows上运行,由于某些原因,concurrent.futures和多处理不能工作,这就是为什么我使用线程模块。
def job1():
return pm.execute_notebook('notebook1.ipynb',log_output=False)
def job2():
return pm.execute_notebook('notebook2.ipynb',log_output=False)
def job3():
return pm.execute_notebook('notebook3.ipynb',log_output=False)
t1 = threading.Thread(target = job1)
t2 = threading.Thread(target = job2)
t3 = threading.Thread(target = job3)
try:
t1.start()
t1.join()
t2.start()
except:
pass
finally:
t3.start()
我喜欢先将所需的流程可视化,我理解为是这样的。
这意味着t1和t2需要同时开始 然后你需要在这两个函数上进行连接
t1.start() # <- Started
t2.start() # <- Started
# t1 and t2 executing concurrently
t1.join()
t2.join()
# wait for both to finish
t3.start()
t3.join()
t1, t2的加入顺序并不重要 因为无论如何,你的程序必须等待运行时间最长的线程。如果t1先完成,它会阻塞在t2上,如果t2先完成,它仍然需要等待t1,然后会在t2.join()上 "no-op"。
我写了一个程序来并行运行DAG,这正是你所需要的。这是作为一个开源项目的一部分完成的。该方案使用进程而不是线程,但你可以适应它。基本思路是有一个进程池,并跟踪每个任务的状态,每当任何一个任务完成时,就会发送下一个下一个任务。顺序是通过迭代DAG来决定的。拓扑顺序.
如果任何任务失败,它们所有的下流依赖都会被中止。程序会继续运行,直到没有更多的任务要运行,代码在这里。https:/github.comploomberploomberblob0.4.1srcploomberexecutorsParallel.py。
我看你是想在papermill中运行笔记本,我开发的工具正是支持这样的功能:它可以运行任意复杂的管道(其中任务可以是参数化的Jupyter笔记本)。看起来对于你的用例来说是个不错的选择。https:/github.comploomberploomber。