Python 数据处理管道的多线程问题

问题描述 投票:0回答:1

我正在开发一个数据密集型项目,该项目涉及通过自定义管道处理大型数据集。该管道由多个阶段组成,每个阶段处理数据转换的特定方面。为了提高性能,我决定使用

concurrent.futures
多线程模块来并行化这些阶段。

这是当前代码的过度简化:

import concurrent.futures

class DataProcessor:
    def __init__(self, data):
        self.data = data

    def stage_one(self, chunk):
        result = [item * 2 for item in chunk]
        return result

    def stage_two(self, chunk):
        result = [item + 5 for item in chunk]
        return result

    def run_pipeline(self):
        # Split the data into chunks for parallel processing
        chunk_size = len(self.data) // 2
        chunk_one, chunk_two = self.data[:chunk_size], self.data[chunk_size:]

        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_one = executor.submit(self.stage_one, chunk_one)
            future_two = executor.submit(self.stage_two, chunk_two)

            concurrent.futures.wait([future_one, future_two])

            result_one = future_one.result()
            result_two = future_two.result()

        final_result = result_one + result_two

        return final_result

def load_large_dataset():
    return list(range(1000000))

data = load_large_dataset()
processor = DataProcessor(data)
result = processor.run_pipeline()

print(result[:10])

在运行管道时,我遇到一些特殊的意外行为,对于同一数据集,结果有时会有所不同,奇怪的是,当我不小心运行管道数百次而不更改输入时,我才注意到这一点。

我尝试过采用不同的线程策略,例如直接使用

threading
模块,但问题似乎仍然存在。

任何见解、示例或资源将不胜感激。

python multithreading concurrency
1个回答
0
投票

作为一般规则,CPU 密集型并行处理应在进程而不是线程中执行。然而,在这种情况下,多线程优于多处理。

当进行多处理时,传入和传出子流程的对象必须被序列化。本例中的对象(作为腌制数据)每个约为 25MB。

根据您所运行的 Python 版本和平台 (OS),性能也会有很大差异。

我修改了问题中的代码以演示上述技术之间的差异。

显示的结果来自在 macOS 14.2.1 (Silicon M2) 上运行的 Python 3.12.1

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from time import perf_counter

class DataProcessor:
    def __init__(self, data):
        self.data = data

    def process(self, t):
        func, chunk = t
        return func(chunk)
    
    def stage_one(self, chunk):
        return [item * 2 for item in chunk]

    def stage_two(self, chunk):
        return [item + 5 for item in chunk]

    def run_pipeline(self, executor):
        # Split the data into chunks for parallel processing
        chunk_size = len(self.data) // 2
        chunk_one, chunk_two = self.data[:chunk_size], self.data[chunk_size:]
        args = [
                (self.stage_one, chunk_one),
                (self.stage_two, chunk_two)
            ]
        with executor(len(args)) as exec:
            result = []
            for r in exec.map(self.process, args):
                result.extend(r)
            return result


def load_large_dataset():
    return list(range(10**7))

if __name__ == "__main__":
    for executor in ThreadPoolExecutor, ProcessPoolExecutor:
        start = perf_counter()
        data = load_large_dataset()
        processor = DataProcessor(data)
        result = processor.run_pipeline(executor)
        end = perf_counter()
        print(executor.__name__, f"Duration = {end-start:.4f}")
        print(result[:10])

输出:

ThreadPoolExecutor Duration = 0.3246
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
ProcessPoolExecutor Duration = 1.3156
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
© www.soinside.com 2019 - 2024. All rights reserved.