如何在Python中使用多处理时显示进度条(tqdm)?

问题描述 投票:0回答:2

我有以下代码,其中

create_data()
指的是我之前已经定义的函数。

%%time
from tqdm import tqdm
from multiprocessing import Pool
import pandas as pd
import os

with Pool(processes=os.cpu_count()) as pool:
    results = pool.map(create_data, date)
    data = [ent for sublist in results for ent in sublist]
    data = pd.DataFrame(data, columns = cols)
    data.to_csv("%s"%str(date), index=False)

我基本上想在传递日期参数的同时调用

create_data()
。然后获得的所有结果将被收集到
results
变量中。然后我会将它们全部合并到一个列表中并将其转换为数据框。函数
create_data
的计算量相当大,因此需要很长时间来计算。这就是为什么我需要进度条来查看进程。

我尝试将其更改为以下内容。

results = list(tqdm(pool.map(create_od, date), total = os.cpu_count()))

但似乎不起作用。我已经等了很长一段时间了,没有进度条出现。我在这里该怎么办?

python multiprocessing progress-bar tqdm
2个回答
8
投票

cf multiprocessing.Pool.map

它会阻塞,直到结果准备好为止

tqdm.tqdm

装饰一个可迭代对象,返回一个迭代器,其行为与原始可迭代对象完全相同,但每次请求值时都会打印动态更新的进度条。

因此,在

map
被调用之前,
tqdm
ping 已完全完成。

我用这段代码复制了:

from time import sleep
from tqdm import tqdm
from multiprocessing import Pool


def crunch(numbers):
    print(numbers)
    sleep(2)


if __name__ == "__main__":
    with Pool(processes=4) as pool:
        print("mapping ...")
        results = tqdm(pool.map(crunch, range(40)), total=40)
        print("done")

打印:

mapping ...
0
3
6
[...]
37
38
  0%|          | 0/40 [00:00<?, ?it/s]done

相反,您应该使用 lazy 版本

multiprocessing.Pool.imap
:它将立即返回一个生成器,您必须迭代该生成器才能获得实际结果,可以将其包装在
tqdm
中。

from time import sleep
from multiprocessing import Pool

from tqdm import tqdm


def crunch(numbers):
    # print(numbers)  # commented out to not mess the tqdm output
    sleep(2)


if __name__ == "__main__":
    with Pool(processes=4) as pool:
        print("mapping ...")
        results = tqdm(pool.imap(crunch, range(40)), total=40)
        print("running ...")
        tuple(results)  # fetch the lazy results
        print("done")

打印:

mapping ...
running ...
  0%|          | 0/40 [00:00<?, ?it/s]
  2%|▎         | 1/40 [00:02<01:35,  2.45s/it]
 12%|█▎        | 5/40 [00:04<00:27,  1.26it/s]
 22%|██▎       | 9/40 [00:06<00:19,  1.58it/s]
 32%|███▎      | 13/40 [00:08<00:15,  1.74it/s]
 42%|████▎     | 17/40 [00:10<00:12,  1.83it/s]
 52%|█████▎    | 21/40 [00:12<00:10,  1.89it/s]
 62%|██████▎   | 25/40 [00:14<00:07,  1.92it/s]
 72%|███████▎  | 29/40 [00:16<00:05,  1.95it/s]
 82%|████████▎ | 33/40 [00:18<00:03,  1.96it/s]
100%|██████████| 40/40 [00:20<00:00,  1.95it/s]
done

(进度条位于多行,因为我的 Windows 终端上的 PyCharm 不支持

\r
,但在你的终端上应该可以正常工作)


2
投票

更新

参见@Lenormnu的回答,他走在正确的轨道上。

然而,

imap
方法的问题是它保证按照参数的顺序返回结果。因此,如果
date
列表的第一个元素的处理时间非常长(它是最后一个完成的任务),则进度条将不会前进,直到该任务完成。但到那时所有其他提交的任务都已完成,进度条将立即跳至 100%。诚然,这不太可能发生。但如果能按完成顺序处理结果就更好了。可以使用
imap_unordered
,但要按照任务提交的顺序恢复结果需要您首先修改
create_data
函数。

如果您使用

ProcessPoolExecutor
模块中的
concurrent.futures
类进行多处理,实际上根本不需要修改
create_data
函数,因为方法
submit
创建了
Future
实例,您可以将其作为键存储在字典中其值为索引,然后将
as_completed
方法应用于字典,以按完成顺序返回已完成的任务并恢复索引。在这里,我更聪明一点,所以不需要排序:

import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
import pandas as pd
import os

with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
    # total argument for tqdm is just the number of submitted tasks:
    with tqdm.tqdm(total=len(date)) as progress_bar:
        futures = {}
        for idx, dt in enumerate(date):
            future = executor.submit(create_data, dt)
            futures[future] = idx
        results = [None] * len(date) # pre_allocate slots
        for future in as_completed(futures):
            idx = futures[future] # order of submission
            results[idx] = future.result()
            progress_bar.update(1) # advance by 1
    data = [ent for sublist in results for ent in sublist]
    data = pd.DataFrame(data, columns = cols)
© www.soinside.com 2019 - 2024. All rights reserved.