我正在开发一个数据密集型项目,该项目涉及通过自定义管道处理大型数据集。该管道由多个阶段组成,每个阶段处理数据转换的特定方面。为了提高性能,我决定使用
concurrent.futures
多线程模块来并行化这些阶段。
这是当前代码的过度简化:
import concurrent.futures
class DataProcessor:
def __init__(self, data):
self.data = data
def stage_one(self, chunk):
result = [item * 2 for item in chunk]
return result
def stage_two(self, chunk):
result = [item + 5 for item in chunk]
return result
def run_pipeline(self):
# Split the data into chunks for parallel processing
chunk_size = len(self.data) // 2
chunk_one, chunk_two = self.data[:chunk_size], self.data[chunk_size:]
with concurrent.futures.ThreadPoolExecutor() as executor:
future_one = executor.submit(self.stage_one, chunk_one)
future_two = executor.submit(self.stage_two, chunk_two)
concurrent.futures.wait([future_one, future_two])
result_one = future_one.result()
result_two = future_two.result()
final_result = result_one + result_two
return final_result
def load_large_dataset():
return list(range(1000000))
data = load_large_dataset()
processor = DataProcessor(data)
result = processor.run_pipeline()
print(result[:10])
在运行管道时,我遇到一些特殊的意外行为,对于同一数据集,结果有时会有所不同,奇怪的是,当我不小心运行管道数百次而不更改输入时,我才注意到这一点。
我尝试过采用不同的线程策略,例如直接使用
threading
模块,但问题似乎仍然存在。
任何见解、示例或资源将不胜感激。
作为一般规则,CPU 密集型并行处理应在进程而不是线程中执行。然而,在这种情况下,多线程优于多处理。
当进行多处理时,传入和传出子流程的对象必须被序列化。本例中的对象(作为腌制数据)每个约为 25MB。
根据您所运行的 Python 版本和平台 (OS),性能也会有很大差异。
我修改了问题中的代码以演示上述技术之间的差异。
显示的结果来自在 macOS 14.2.1 (Silicon M2) 上运行的 Python 3.12.1
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from time import perf_counter
class DataProcessor:
def __init__(self, data):
self.data = data
def process(self, t):
func, chunk = t
return func(chunk)
def stage_one(self, chunk):
return [item * 2 for item in chunk]
def stage_two(self, chunk):
return [item + 5 for item in chunk]
def run_pipeline(self, executor):
# Split the data into chunks for parallel processing
chunk_size = len(self.data) // 2
chunk_one, chunk_two = self.data[:chunk_size], self.data[chunk_size:]
args = [
(self.stage_one, chunk_one),
(self.stage_two, chunk_two)
]
with executor(len(args)) as exec:
result = []
for r in exec.map(self.process, args):
result.extend(r)
return result
def load_large_dataset():
return list(range(10**7))
if __name__ == "__main__":
for executor in ThreadPoolExecutor, ProcessPoolExecutor:
start = perf_counter()
data = load_large_dataset()
processor = DataProcessor(data)
result = processor.run_pipeline(executor)
end = perf_counter()
print(executor.__name__, f"Duration = {end-start:.4f}")
print(result[:10])
输出:
ThreadPoolExecutor Duration = 0.3246
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
ProcessPoolExecutor Duration = 1.3156
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]