我们如何在与joblib并行执行中使用tqdm?

问题描述 投票:0回答:10

我想使用 joblib 并行运行一个函数,并等待所有并行节点完成。就像例子中的那样:

from math import sqrt
from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))

但是,我希望执行将在单个进度条中看到,就像 tqdm 一样,显示已完成多少作业。

你会怎么做?

python parallel-processing joblib tqdm
10个回答
59
投票

只需将

range(10)
放入
tqdm(...)
即可!这对你来说可能看起来好得令人难以置信,但它确实有效(在我的机器上):

from math import sqrt
from joblib import Parallel, delayed  
from tqdm import tqdm  
result = Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in tqdm(range(100000)))

50
投票

我创建了 pqdm 一个具有并发 future 的并行 tqdm 包装器,可以轻松地完成此任务,请尝试一下!

安装

pip install pqdm

并使用

from pqdm.processes import pqdm
# If you want threads instead:
# from pqdm.threads import pqdm

args = [1, 2, 3, 4, 5]
# args = range(1,6) would also work

def square(a):
    return a*a

result = pqdm(args, square, n_jobs=2)

26
投票

修改nth的伟大答案以允许动态标志是否使用TQDM,并提前指定总时间,以便正确填充状态栏。

from tqdm.auto import tqdm
from joblib import Parallel

class ProgressParallel(Parallel):
    def __init__(self, use_tqdm=True, total=None, *args, **kwargs):
        self._use_tqdm = use_tqdm
        self._total = total
        super().__init__(*args, **kwargs)

    def __call__(self, *args, **kwargs):
        with tqdm(disable=not self._use_tqdm, total=self._total) as self._pbar:
            return Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        if self._total is None:
            self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()

18
投票

如上所述,简单包装传递给

joblib.Parallel()
的可迭代的解决方案并不能真正监控执行进度。相反,我建议子类化
Parallel
并重写
print_progress()
方法,如下所示:

import joblib
from tqdm.auto import tqdm

class ProgressParallel(joblib.Parallel):
    def __call__(self, *args, **kwargs):
        with tqdm() as self._pbar:
            return joblib.Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()

14
投票

无需安装额外的软件包。您可以在 contrib.concurrent 中使用 tqdm 的本机支持: https://tqdm.github.io/docs/contrib.concurrent/

from tqdm.contrib.concurrent import process_map
# If you want threads instead:
# from tqdm.contrib.concurrent import thread_map
import time

args = range(5)

def square(a):
    time.sleep(a)
    return a*a

result = process_map(square, args, max_workers=2)

7
投票

这是可能的解决方法

def func(x):
    time.sleep(random.randint(1, 10))
    return x

def text_progessbar(seq, total=None):
    step = 1
    tick = time.time()
    while True:
        time_diff = time.time()-tick
        avg_speed = time_diff/step
        total_str = 'of %n' % total if total else ''
        print('step', step, '%.2f' % time_diff, 
              'avg: %.2f iter/sec' % avg_speed, total_str)
        step += 1
        yield next(seq)

all_bar_funcs = {
    'tqdm': lambda args: lambda x: tqdm(x, **args),
    'txt': lambda args: lambda x: text_progessbar(x, **args),
    'False': lambda args: iter,
    'None': lambda args: iter,
}

def ParallelExecutor(use_bar='tqdm', **joblib_args):
    def aprun(bar=use_bar, **tq_args):
        def tmp(op_iter):
            if str(bar) in all_bar_funcs.keys():
                bar_func = all_bar_funcs[str(bar)](tq_args)
            else:
                raise ValueError("Value %s not supported as bar type"%bar)
            return Parallel(**joblib_args)(bar_func(op_iter))
        return tmp
    return aprun

aprun = ParallelExecutor(n_jobs=5)

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))

6
投票

我创建了 tqdm_joblib 来解决这个问题。

安装:

pip install tqdm-joblib

自述文件:

https://stackoverflow.com/a/58936697/5133167复制的简单片段

打包以方便重用。
from joblib import Parallel, delayed
from tqdm_joblib import tqdm_joblib

with tqdm_joblib(desc="My calculation", total=10) as progress_bar:
    Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))

2
投票

如果您的问题由许多部分组成,您可以将这些部分分成 
k
 子组,并行运行每个子组并更新其间的进度条,从而导致 
k

更新进度。

文档中的以下示例对此进行了演示。

>>> with Parallel(n_jobs=2) as parallel: ... accumulator = 0. ... n_iter = 0 ... while accumulator < 1000: ... results = parallel(delayed(sqrt)(accumulator + i ** 2) ... for i in range(5)) ... accumulator += sum(results) # synchronization barrier ... n_iter += 1

https://pythonhosted.org/joblib/parallel.html#reusing-a-pool-of-workers


1
投票

其他答案(包括 user394430nth

的课程)都不适合我。

但是来自类似问题的这个答案效果很好。转发方便。

import contextlib
import joblib
from tqdm import tqdm

@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar given as argument"""
    class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
        def __call__(self, *args, **kwargs):
            tqdm_object.update(n=self.batch_size)
            return super().__call__(*args, **kwargs)

    old_batch_callback = joblib.parallel.BatchCompletionCallBack
    joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
    try:
        yield tqdm_object
    finally:
        joblib.parallel.BatchCompletionCallBack = old_batch_callback
        tqdm_object.close()

然后包装为上下文管理器

from math import sqrt
from joblib import Parallel, delayed

with tqdm_joblib(tqdm(desc="My calculation", total=10)) as progress_bar:
    Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))

适用于版本:

  • joblib - 1.2.0
  • tqdm - 4.64.1
  • python - 3.9.13

0
投票

从此复制粘贴答案


从 2023 年 6 月发布的 joblib v1.3.0 开始,有一种更简单的方法可以用 tqdm 进度条包装

joblib.Parallel
(受到 此评论的启发)。

此进度条将跟踪作业完成情况,而不是作业排队情况。以前这需要一个特殊的上下文管理器。这是一个例子:

from joblib import Parallel, delayed
from tqdm import tqdm

import time
import random

# Our example worker will sleep for a certain number of seconds.

inputs = list(range(10))
random.shuffle(inputs)

def worker(n_seconds):
    time.sleep(n_seconds)
    return n_seconds

# Run the worker jobs in parallel, with a tqdm progress bar.
# We configure Parallel to return a generator.
# Then we wrap the generator in tqdm.
# Finally, we execute everything by converting the tqdm generator to a list.

outputs = list(
    tqdm(
        # Note the new return_as argument here, which requires joblib >= 1.3:
        Parallel(return_as="generator", n_jobs=3)(
            delayed(worker)(n_seconds) for n_seconds in inputs
        ),
        total=len(inputs),
    )
)
print(outputs)
© www.soinside.com 2019 - 2024. All rights reserved.