我想使用 joblib 并行运行一个函数,并等待所有并行节点完成。就像例子中的那样:
from math import sqrt
from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
但是,我希望执行将在单个进度条中看到,就像 tqdm 一样,显示已完成多少作业。
你会怎么做?
只需将
range(10)
放入tqdm(...)
即可!这对你来说可能看起来好得令人难以置信,但它确实有效(在我的机器上):
from math import sqrt
from joblib import Parallel, delayed
from tqdm import tqdm
result = Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in tqdm(range(100000)))
我创建了 pqdm 一个具有并发 future 的并行 tqdm 包装器,可以轻松地完成此任务,请尝试一下!
安装
pip install pqdm
并使用
from pqdm.processes import pqdm
# If you want threads instead:
# from pqdm.threads import pqdm
args = [1, 2, 3, 4, 5]
# args = range(1,6) would also work
def square(a):
return a*a
result = pqdm(args, square, n_jobs=2)
修改nth的伟大答案以允许动态标志是否使用TQDM,并提前指定总时间,以便正确填充状态栏。
from tqdm.auto import tqdm
from joblib import Parallel
class ProgressParallel(Parallel):
def __init__(self, use_tqdm=True, total=None, *args, **kwargs):
self._use_tqdm = use_tqdm
self._total = total
super().__init__(*args, **kwargs)
def __call__(self, *args, **kwargs):
with tqdm(disable=not self._use_tqdm, total=self._total) as self._pbar:
return Parallel.__call__(self, *args, **kwargs)
def print_progress(self):
if self._total is None:
self._pbar.total = self.n_dispatched_tasks
self._pbar.n = self.n_completed_tasks
self._pbar.refresh()
如上所述,简单包装传递给
joblib.Parallel()
的可迭代的解决方案并不能真正监控执行进度。相反,我建议子类化 Parallel
并重写 print_progress()
方法,如下所示:
import joblib
from tqdm.auto import tqdm
class ProgressParallel(joblib.Parallel):
def __call__(self, *args, **kwargs):
with tqdm() as self._pbar:
return joblib.Parallel.__call__(self, *args, **kwargs)
def print_progress(self):
self._pbar.total = self.n_dispatched_tasks
self._pbar.n = self.n_completed_tasks
self._pbar.refresh()
无需安装额外的软件包。您可以在 contrib.concurrent 中使用 tqdm 的本机支持: https://tqdm.github.io/docs/contrib.concurrent/
from tqdm.contrib.concurrent import process_map
# If you want threads instead:
# from tqdm.contrib.concurrent import thread_map
import time
args = range(5)
def square(a):
time.sleep(a)
return a*a
result = process_map(square, args, max_workers=2)
这是可能的解决方法
def func(x):
time.sleep(random.randint(1, 10))
return x
def text_progessbar(seq, total=None):
step = 1
tick = time.time()
while True:
time_diff = time.time()-tick
avg_speed = time_diff/step
total_str = 'of %n' % total if total else ''
print('step', step, '%.2f' % time_diff,
'avg: %.2f iter/sec' % avg_speed, total_str)
step += 1
yield next(seq)
all_bar_funcs = {
'tqdm': lambda args: lambda x: tqdm(x, **args),
'txt': lambda args: lambda x: text_progessbar(x, **args),
'False': lambda args: iter,
'None': lambda args: iter,
}
def ParallelExecutor(use_bar='tqdm', **joblib_args):
def aprun(bar=use_bar, **tq_args):
def tmp(op_iter):
if str(bar) in all_bar_funcs.keys():
bar_func = all_bar_funcs[str(bar)](tq_args)
else:
raise ValueError("Value %s not supported as bar type"%bar)
return Parallel(**joblib_args)(bar_func(op_iter))
return tmp
return aprun
aprun = ParallelExecutor(n_jobs=5)
a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
我创建了 tqdm_joblib 来解决这个问题。
安装:
pip install tqdm-joblib
自述文件:
从https://stackoverflow.com/a/58936697/5133167复制的简单片段
打包以方便重用。from joblib import Parallel, delayed
from tqdm_joblib import tqdm_joblib
with tqdm_joblib(desc="My calculation", total=10) as progress_bar:
Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))
如果您的问题由许多部分组成,您可以将这些部分分成
k
子组,并行运行每个子组并更新其间的进度条,从而导致
k
更新进度。文档中的以下示例对此进行了演示。
>>> with Parallel(n_jobs=2) as parallel:
... accumulator = 0.
... n_iter = 0
... while accumulator < 1000:
... results = parallel(delayed(sqrt)(accumulator + i ** 2)
... for i in range(5))
... accumulator += sum(results) # synchronization barrier
... n_iter += 1
https://pythonhosted.org/joblib/parallel.html#reusing-a-pool-of-workers
import contextlib
import joblib
from tqdm import tqdm
@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
"""Context manager to patch joblib to report into tqdm progress bar given as argument"""
class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
def __call__(self, *args, **kwargs):
tqdm_object.update(n=self.batch_size)
return super().__call__(*args, **kwargs)
old_batch_callback = joblib.parallel.BatchCompletionCallBack
joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
try:
yield tqdm_object
finally:
joblib.parallel.BatchCompletionCallBack = old_batch_callback
tqdm_object.close()
然后包装为上下文管理器
from math import sqrt
from joblib import Parallel, delayed
with tqdm_joblib(tqdm(desc="My calculation", total=10)) as progress_bar:
Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))
适用于版本:
从此复制粘贴答案
从 2023 年 6 月发布的 joblib v1.3.0 开始,有一种更简单的方法可以用 tqdm 进度条包装
joblib.Parallel
(受到 此评论的启发)。
此进度条将跟踪作业完成情况,而不是作业排队情况。以前这需要一个特殊的上下文管理器。这是一个例子:
from joblib import Parallel, delayed
from tqdm import tqdm
import time
import random
# Our example worker will sleep for a certain number of seconds.
inputs = list(range(10))
random.shuffle(inputs)
def worker(n_seconds):
time.sleep(n_seconds)
return n_seconds
# Run the worker jobs in parallel, with a tqdm progress bar.
# We configure Parallel to return a generator.
# Then we wrap the generator in tqdm.
# Finally, we execute everything by converting the tqdm generator to a list.
outputs = list(
tqdm(
# Note the new return_as argument here, which requires joblib >= 1.3:
Parallel(return_as="generator", n_jobs=3)(
delayed(worker)(n_seconds) for n_seconds in inputs
),
total=len(inputs),
)
)
print(outputs)