我正在尝试使用多处理作为加快数据处理速度的一种方法。我的数据包含3000json文件,我的代码如下:
def analyse_page(file):
with open(file) as f:
data = json.load(f)
for i in range(data):
data[i] = treat_item(data[i])
with open(output_json, 'w') as f:
json.dump(f,data)
for file in files:
analyse_page(file)
print('done!')
因此,想法是处理json的项目,然后输出修改后的json。我发现我的计算机为一个简单的For循环使用15%的Cpu功率,因此我决定使用Multiprocessing,但遇到了一个我无法理解的问题。我已经尝试过Process和Pool了,无论是成块的还是完全的,但是,每次它总是可以处理三分之一的文件,然后脚本停止运行而不会出错!
所以我再次使用if os.path.exists(): continue
启动代码,以便忽略处理过的文件。即使这样,它也会处理另外三分之一的文件并停止。因此,当我再次启动它时,它又执行了另一次,然后打印done!
analyse_page
函数每页大约需要3s,因此长时间内在多处理中启动相同功能的正确方法是什么?
更新,我已经完成的工作:
处理中
processes = []
for file in files:
p = multiprocessing.Process(target=analyse_page, args=(file,))
processes.append(p)
p.start()
for process in processes:
process.join()
批处理
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
processes = []
numberOfThreads = 6 #Max is 8
For file in files:
p = multiprocessing.Process(target=analyse_page, args=(file,))
processes.append(p)
for i in chunks(processes,numberOfThreads):
for j in i:
j.start()
for j in i:
j.join()
游泳池
pool = multiprocessing.Pool(6)
For file in files:
pool.map(analyse_page, file)
pool.close()
为了轻松处理多进程,您可以使用concurrent.futures模块。
Python Documentation: Concurrent Futues
在我解释每一个asperct之前,有一个很棒的视频教程,带有示例代码(易于适应):