如何使用tqdm和ProcessPoolExecutor

问题描述 投票:0回答:1

我正在创建一个程序,可以处理有时非常大的文件(GB 级)。这需要很多时间。一开始我尝试使用ThreadPoolExecutor,相对提高了速度。例如,一个约 200 Mb 的文件同步运行大约需要 3 分钟,而使用 ThreadPoolExecutor 则需要约 130 秒以上。这对我来说太慢了。我尝试了 ProcessPoolExecutor,效果非常好。大约 12-18 秒即可完成相同的工作。这是有道理的,因为该任务占用了大量的 cpu。现在的问题是如何可视化任务的进度。我用的是tqdm。有了线程,一切都运行得非常好。我可以看到美好的进步。但是当我改用Processpool时,程序崩溃了。

我的代码如下所示:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm
import os


class FileProcessor:
    NJOBS = 8 # 4 for multiprocessing
    CHK_SIZE = 1024 * 1024 * 80 # 80 MB


    def __init__(self, fp: str | Path):
        self.filepath = Path(fp)
        self.filesize = os.path.getsize(self.filepath)

    @classmethod
    def _process_chunk(chunk: bytes, pb: tqdm, *somemoreargs):
        # processes each byte, updates progressbar afterwards
        array = bytearray(chunk)
        for i in range(len(array)):
            # do sam' with byte at i
            time.sleep(0.0001)
            if pb:
                pb.update()
        return bytes(array) # and some more vals

    def perform(self):
        def subchunk(chunk: bytes):
            schk_size = len(chunk) // FileProcessor.NJOBS
            if not schk_size:
                schk_size = len(chunk) # will work on this later
            i = 0
            while (schunk := chunk[i:i + schk_size]):
                yield schunk # and some more info
                i += schk_size

        progressbar = tqdm(range(self.filesize))
        file = self.filepath.open(mode="rb")
        executor = ThreadPoolExecutor(max_workers=FileProcessor.NJOBS)
        with progressbar, file, executor:
            while (chunk := file.read(FileProcessor.CHK_SIZE)):
                futures = [executor.submit(FileProcessor._process_chunk, sc, progressbar) for sc in subchunk(chunk)]
                for future in as_completed(futures):
                    # do something with results
                    pass
            # do final stuff

这对于多线程效果很好。进度条顺利填充。但是当我更改为多进程时,程序崩溃了。我猜测是因为“进程不共享内存空间”。

所以,问题是如何使用 tqdm 在使用多重处理的同时顺利显示进度。现在,我正在流程结束后更新进度条:在

for future in as_completed(futures)
中,但进度显示相当难看,跳跃很大

python multiprocessing concurrent.futures tqdm
1个回答
0
投票

您可以生成一个额外的进程来负责更新进度条。要将数据发送到此进程,您可以使用

Queue()

这是修改后的示例:

import os
import time
from concurrent.futures import (ProcessPoolExecutor, ThreadPoolExecutor,
                                as_completed)
from multiprocessing import Process, Queue
from pathlib import Path

from tqdm import tqdm


class FileProcessor:
    NJOBS = 8  # 4 for multiprocessing
    CHK_SIZE = 1024 * 1024 * 80  # 80 MB

    def __init__(self, fp: str | Path):
        self.filepath = Path(fp)
        self.filesize = os.path.getsize(self.filepath)

    @staticmethod
    def _process_chunk(chunk: bytes, queue, *somemoreargs):
        for i in chunk:
            # do sam' with byte at i
            time.sleep(0.0001)
            queue.put_nowait(1)
        return "xxx"

    @staticmethod
    def _pb_updater(filesize, queue):
        progressbar = tqdm(total=filesize)

        while True:
            how_much_to_update = queue.get()
            progressbar.update(how_much_to_update)
            progressbar.refresh()

    def perform(self):
        def subchunk(chunk: bytes):
            schk_size = len(chunk) // FileProcessor.NJOBS
            if not schk_size:
                schk_size = len(chunk)  # will work on this later
            i = 0
            while schunk := chunk[i : i + schk_size]:
                yield schunk  # and some more info
                i += schk_size

        pb_queue = Queue()

        pb_updater = Process(
            target=FileProcessor._pb_updater,
            args=(self.filesize, pb_queue),
            daemon=True,
        )
        pb_updater.start()

        file = self.filepath.open(mode="rb")
        executor = ThreadPoolExecutor(max_workers=FileProcessor.NJOBS)
        with file, executor:
            while chunk := file.read(FileProcessor.CHK_SIZE):
                futures = [
                    executor.submit(FileProcessor._process_chunk, sc, pb_queue)
                    for sc in subchunk(chunk)
                ]
                for future in as_completed(futures):
                    _ = future.result()

            # do final stuff


if __name__ == "__main__":
    f = FileProcessor("some_big_file.tar")
    f.perform()

打印:

  0%|                      | 98333/12200314880 [00:04<153:34:42, 22066.56it/s]
© www.soinside.com 2019 - 2024. All rights reserved.