我有以下代码,其中
create_data()
指的是我之前已经定义的函数。
%%time
from tqdm import tqdm
from multiprocessing import Pool
import pandas as pd
import os
with Pool(processes=os.cpu_count()) as pool:
results = pool.map(create_data, date)
data = [ent for sublist in results for ent in sublist]
data = pd.DataFrame(data, columns = cols)
data.to_csv("%s"%str(date), index=False)
我基本上想在传递日期参数的同时调用
create_data()
。然后获得的所有结果将被收集到 results
变量中。然后我会将它们全部合并到一个列表中并将其转换为数据框。函数create_data
的计算量相当大,因此需要很长时间来计算。这就是为什么我需要进度条来查看进程。
我尝试将其更改为以下内容。
results = list(tqdm(pool.map(create_od, date), total = os.cpu_count()))
但似乎不起作用。我已经等了很长一段时间了,没有进度条出现。我在这里该怎么办?
它会阻塞,直到结果准备好为止
装饰一个可迭代对象,返回一个迭代器,其行为与原始可迭代对象完全相同,但每次请求值时都会打印动态更新的进度条。
因此,在
map
被调用之前,tqdm
ping 已完全完成。
我用这段代码复制了:
from time import sleep
from tqdm import tqdm
from multiprocessing import Pool
def crunch(numbers):
print(numbers)
sleep(2)
if __name__ == "__main__":
with Pool(processes=4) as pool:
print("mapping ...")
results = tqdm(pool.map(crunch, range(40)), total=40)
print("done")
打印:
mapping ...
0
3
6
[...]
37
38
0%| | 0/40 [00:00<?, ?it/s]done
multiprocessing.Pool.imap
:它将立即返回一个生成器,您必须迭代该生成器才能获得实际结果,可以将其包装在 tqdm
中。
from time import sleep
from multiprocessing import Pool
from tqdm import tqdm
def crunch(numbers):
# print(numbers) # commented out to not mess the tqdm output
sleep(2)
if __name__ == "__main__":
with Pool(processes=4) as pool:
print("mapping ...")
results = tqdm(pool.imap(crunch, range(40)), total=40)
print("running ...")
tuple(results) # fetch the lazy results
print("done")
打印:
mapping ...
running ...
0%| | 0/40 [00:00<?, ?it/s]
2%|▎ | 1/40 [00:02<01:35, 2.45s/it]
12%|█▎ | 5/40 [00:04<00:27, 1.26it/s]
22%|██▎ | 9/40 [00:06<00:19, 1.58it/s]
32%|███▎ | 13/40 [00:08<00:15, 1.74it/s]
42%|████▎ | 17/40 [00:10<00:12, 1.83it/s]
52%|█████▎ | 21/40 [00:12<00:10, 1.89it/s]
62%|██████▎ | 25/40 [00:14<00:07, 1.92it/s]
72%|███████▎ | 29/40 [00:16<00:05, 1.95it/s]
82%|████████▎ | 33/40 [00:18<00:03, 1.96it/s]
100%|██████████| 40/40 [00:20<00:00, 1.95it/s]
done
(进度条位于多行,因为我的 Windows 终端上的 PyCharm 不支持
\r
,但在你的终端上应该可以正常工作)
更新
参见@Lenormnu的回答,他走在正确的轨道上。
然而,
imap
方法的问题是它保证按照参数的顺序返回结果。因此,如果 date
列表的第一个元素的处理时间非常长(它是最后一个完成的任务),则进度条将不会前进,直到该任务完成。但到那时所有其他提交的任务都已完成,进度条将立即跳至 100%。诚然,这不太可能发生。但如果能按完成顺序处理结果就更好了。可以使用 imap_unordered
,但要按照任务提交的顺序恢复结果需要您首先修改 create_data
函数。
如果您使用
ProcessPoolExecutor
模块中的 concurrent.futures
类进行多处理,实际上根本不需要修改 create_data
函数,因为方法 submit
创建了 Future
实例,您可以将其作为键存储在字典中其值为索引,然后将 as_completed
方法应用于字典,以按完成顺序返回已完成的任务并恢复索引。在这里,我更聪明一点,所以不需要排序:
import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
import pandas as pd
import os
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
# total argument for tqdm is just the number of submitted tasks:
with tqdm.tqdm(total=len(date)) as progress_bar:
futures = {}
for idx, dt in enumerate(date):
future = executor.submit(create_data, dt)
futures[future] = idx
results = [None] * len(date) # pre_allocate slots
for future in as_completed(futures):
idx = futures[future] # order of submission
results[idx] = future.result()
progress_bar.update(1) # advance by 1
data = [ent for sublist in results for ent in sublist]
data = pd.DataFrame(data, columns = cols)