在python中多处理大量数据

问题描述 投票:0回答:1

我正在尝试使用多处理作为加快数据处理速度的一种方法。我的数据包含3000json文件,我的代码如下:

def analyse_page(file):
    with open(file) as f:
        data = json.load(f)
    for i in range(data):
        data[i] = treat_item(data[i])
    with open(output_json, 'w') as f:
        json.dump(f,data)

for file in files:
    analyse_page(file)

print('done!')

因此,想法是处理json的项目,然后输出修改后的json。我发现我的计算机为一个简单的For循环使用15%的Cpu功率,因此我决定使用Multiprocessing,但遇到了一个我无法理解的问题。我已经尝试过Process和Pool了,无论是成块的还是完全的,但是,每次它总是可以处理三分之一的文件,然后脚本停止运行而不会出错!

所以我再次使用if os.path.exists(): continue启动代码,以便忽略处理过的文件。即使这样,它也会处理另外三分之一的文件并停止。因此,当我再次启动它时,它又执行了另一次,然后打印done!

analyse_page函数每页大约需要3s,因此长时间内在多处理中启动相同功能的正确方法是什么?

更新,我已经完成的工作:

处理中

processes = []
for file in files:
    p = multiprocessing.Process(target=analyse_page, args=(file,))
    processes.append(p)
    p.start()
for process in processes:
    process.join()

批处理

def chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

processes = []
numberOfThreads = 6 #Max is 8
For file in files:
    p = multiprocessing.Process(target=analyse_page, args=(file,))
    processes.append(p)

for i in chunks(processes,numberOfThreads):
    for j in i:
        j.start()
    for j in i:
        j.join()

游泳池

pool = multiprocessing.Pool(6)
For file in files:
    pool.map(analyse_page, file)
pool.close() 
python parallel-processing multiprocessing python-multithreading
1个回答
0
投票

为了轻松处理多进程,您可以使用concurrent.futures模块。

Python Documentation: Concurrent Futues

在我解释每一个asperct之前,有一个很棒的视频教程,带有示例代码(易于适应):

YouTube: Tutorial Multiprocessing

© www.soinside.com 2019 - 2024. All rights reserved.