我正在创建一个程序,可以处理有时非常大的文件(GB 级)。这需要很多时间。一开始我尝试使用ThreadPoolExecutor,相对提高了速度。例如,一个约 200 Mb 的文件同步运行大约需要 3 分钟,而使用 ThreadPoolExecutor 则需要约 130 秒以上。这对我来说太慢了。我尝试了 ProcessPoolExecutor,效果非常好。大约 12-18 秒即可完成相同的工作。这是有道理的,因为该任务占用了大量的 cpu。现在的问题是如何可视化任务的进度。我用的是tqdm。有了线程,一切都运行得非常好。我可以看到美好的进步。但是当我改用Processpool时,程序崩溃了。
我的代码如下所示:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm
import os
class FileProcessor:
NJOBS = 8 # 4 for multiprocessing
CHK_SIZE = 1024 * 1024 * 80 # 80 MB
def __init__(self, fp: str | Path):
self.filepath = Path(fp)
self.filesize = os.path.getsize(self.filepath)
@classmethod
def _process_chunk(chunk: bytes, pb: tqdm, *somemoreargs):
# processes each byte, updates progressbar afterwards
array = bytearray(chunk)
for i in range(len(array)):
# do sam' with byte at i
time.sleep(0.0001)
if pb:
pb.update()
return bytes(array) # and some more vals
def perform(self):
def subchunk(chunk: bytes):
schk_size = len(chunk) // FileProcessor.NJOBS
if not schk_size:
schk_size = len(chunk) # will work on this later
i = 0
while (schunk := chunk[i:i + schk_size]):
yield schunk # and some more info
i += schk_size
progressbar = tqdm(range(self.filesize))
file = self.filepath.open(mode="rb")
executor = ThreadPoolExecutor(max_workers=FileProcessor.NJOBS)
with progressbar, file, executor:
while (chunk := file.read(FileProcessor.CHK_SIZE)):
futures = [executor.submit(FileProcessor._process_chunk, sc, progressbar) for sc in subchunk(chunk)]
for future in as_completed(futures):
# do something with results
pass
# do final stuff
这对于多线程效果很好。进度条顺利填充。但是当我更改为多进程时,程序崩溃了。我猜测是因为“进程不共享内存空间”。
所以,问题是如何使用 tqdm 在使用多重处理的同时顺利显示进度。现在,我正在流程结束后更新进度条:在
for future in as_completed(futures)
中,但进度显示相当难看,跳跃很大
您可以生成一个额外的进程来负责更新进度条。要将数据发送到此进程,您可以使用
Queue()
。
这是修改后的示例:
import os
import time
from concurrent.futures import (ProcessPoolExecutor, ThreadPoolExecutor,
as_completed)
from multiprocessing import Process, Queue
from pathlib import Path
from tqdm import tqdm
class FileProcessor:
NJOBS = 8 # 4 for multiprocessing
CHK_SIZE = 1024 * 1024 * 80 # 80 MB
def __init__(self, fp: str | Path):
self.filepath = Path(fp)
self.filesize = os.path.getsize(self.filepath)
@staticmethod
def _process_chunk(chunk: bytes, queue, *somemoreargs):
for i in chunk:
# do sam' with byte at i
time.sleep(0.0001)
queue.put_nowait(1)
return "xxx"
@staticmethod
def _pb_updater(filesize, queue):
progressbar = tqdm(total=filesize)
while True:
how_much_to_update = queue.get()
progressbar.update(how_much_to_update)
progressbar.refresh()
def perform(self):
def subchunk(chunk: bytes):
schk_size = len(chunk) // FileProcessor.NJOBS
if not schk_size:
schk_size = len(chunk) # will work on this later
i = 0
while schunk := chunk[i : i + schk_size]:
yield schunk # and some more info
i += schk_size
pb_queue = Queue()
pb_updater = Process(
target=FileProcessor._pb_updater,
args=(self.filesize, pb_queue),
daemon=True,
)
pb_updater.start()
file = self.filepath.open(mode="rb")
executor = ThreadPoolExecutor(max_workers=FileProcessor.NJOBS)
with file, executor:
while chunk := file.read(FileProcessor.CHK_SIZE):
futures = [
executor.submit(FileProcessor._process_chunk, sc, pb_queue)
for sc in subchunk(chunk)
]
for future in as_completed(futures):
_ = future.result()
# do final stuff
if __name__ == "__main__":
f = FileProcessor("some_big_file.tar")
f.perform()
打印:
0%| | 98333/12200314880 [00:04<153:34:42, 22066.56it/s]