在同一线程中以特定顺序运行多个函数。

问题描述 投票:0回答:1

我有三个函数,使用papermill执行3个不同的jupyter笔记本,我希望第一个函数(job1)和第二个函数(job2)同时运行,最后一个函数(job3)在第一个函数(job1)完成运行后才运行,而不会出现任何错误。我不知道为第二个函数创建一个新的线程是否合理,也不知道如何恰当地使用join()方法。我在Windows上运行,由于某些原因,concurrent.futures和多处理不能工作,这就是为什么我使用线程模块。

def job1():

    return pm.execute_notebook('notebook1.ipynb',log_output=False)

def job2():

     return pm.execute_notebook('notebook2.ipynb',log_output=False)

def job3():

     return pm.execute_notebook('notebook3.ipynb',log_output=False)


t1 = threading.Thread(target = job1)
t2 = threading.Thread(target = job2)
t3 = threading.Thread(target = job3)


try:
   t1.start()
   t1.join()
   t2.start()

except:
   pass

finally:

   t3.start()
python concurrency jupyter-notebook python-multithreading papermill
1个回答
0
投票

我喜欢先将所需的流程可视化,我理解为是这样的。

enter image description here

这意味着t1和t2需要同时开始 然后你需要在这两个函数上进行连接

   t1.start() # <- Started 
   t2.start() # <- Started
   # t1 and t2 executing concurrently

   t1.join()
   t2.join()
   # wait for both to finish

   t3.start()
   t3.join()

t1, t2的加入顺序并不重要 因为无论如何,你的程序必须等待运行时间最长的线程。如果t1先完成,它会阻塞在t2上,如果t2先完成,它仍然需要等待t1,然后会在t2.join()上 "no-op"。


0
投票

我写了一个程序来并行运行DAG,这正是你所需要的。这是作为一个开源项目的一部分完成的。该方案使用进程而不是线程,但你可以适应它。基本思路是有一个进程池,并跟踪每个任务的状态,每当任何一个任务完成时,就会发送下一个下一个任务。顺序是通过迭代DAG来决定的。拓扑顺序.

如果任何任务失败,它们所有的下流依赖都会被中止。程序会继续运行,直到没有更多的任务要运行,代码在这里。https:/github.comploomberploomberblob0.4.1srcploomberexecutorsParallel.py。

我看你是想在papermill中运行笔记本,我开发的工具正是支持这样的功能:它可以运行任意复杂的管道(其中任务可以是参数化的Jupyter笔记本)。看起来对于你的用例来说是个不错的选择。https:/github.comploomberploomber。

© www.soinside.com 2019 - 2024. All rights reserved.