我在 PyTorch 中实现了一些模型,在自定义平台上评估它们的性能(包装 Pytorch,保持整体界面)。
但这确实很慢:在单个 CPU 上测试 10k CIFAR10 需要近 30 分钟。我的云农场没有可用的 GPU,但高度以 CPU 为导向,具有可用的内存负载。因此,我正在考虑生成多个线程/进程来并行化这些推理测试。
我知道由于 GIL 和 Pytorch 资源模型,这对于 Python 来说并不是那么简单;从一些研究中我发现
torch.multiprocessing.Pool
。
这是最好的方法吗?我如何在
N
CPU 上部署 N
推理任务,然后将结果收集到数组中?我想知道某些 torch.device
信息是否必须处理或自动完成。
类似:
for task in inference_tasks:
p = spawn(process)
accuracy = inference(model, p)
....
#collect results
results.append(accuracy)
Python 中的并行性非常简单。棘手之处在于如何分配你的工作,并且共享内存/状态是困难和/或耗时的。理想的可并行函数需要很少/小的输入并返回很少/小的输出。
sum(range(N, M))
非常理想。它接受两个整数作为输入并返回一个整数。示例:
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
# job to parallelise computation of sum(range(N, M))
N = 0
M = 1_000_000_000
range_ = range(N, M)
with ProcessPoolExecutor(max_workers=os.cpu_count()) as pool:
# compute batch size
chunk_size, remainder = divmod(len(range_), os.cpu_count())
if remainder:
chunk_size += 1
# split job into roughly equal size chunks
futures = []
for i in range(os.cpu_count()):
fut = pool.submit(sum, range_[i*chunk_size:(i+1)*chunk_size])
futures.append(fut)
# process results as and when they become ready
total = 0
for future in as_completed(futures):
total += future.result()
print(f'{total=}')
max_workers=os.cpu_count()
并不是严格需要的,因为这是 ProcessPoolExecutor
的默认行为。