并行化大CSV文件进程

问题描述 投票:1回答:1

编辑:解决方案我首先使用split bash函数将我的文件拆分为10000行。然后:

with Pool(processes=32) as pool:
    for level in range(75):
        all_results=[]
        for f in level_dir:
            res = pool.apply_async(process_file, args=[f, level]
            all_results.append(res)
        for res in all_results:
            res.get()
        save_matrix()

我正在使用几个非常大的CSV文件(其中74个,10GB到65GB),并且需要逐行读取它们以从中提取数据并将其放入矩阵中。

我正在使用Python,因为使用C / C ++解析具有空字段和JSON字段的CSV文件并不容易。

我现在正在做的是我使用ThreadPool,但它看起来并不像它使用CPU到它的全部容量(Xeon E5),我认为这可能是因为矩阵填充。

M = np.zeros((users.size, levels.size, 2))

def process_line(row):
    data    = json.loads(row[3])
    usr     = data['usr']
    #compute stuff
    M[usr, level, 0] = score
    M[usr, level, 1] = t_len

def main():
    for level in range(75):
        csv_f  = open("level{}.csv".format(level))
        reader = csv.reader(csv_f)
        t      = ThreadPool(processes=32)
        for row in reader:
            t.map(process_line, (row, level, ))
        t.join()
        t.close()
    np.save("matrix.npy", M)

当我在每一行处理中打印时间戳时,看起来改变进程数并不会改变任何东西,它与不使用ThreadPool时一样慢。

我该怎么做才能让我的代码运行得更快?

如果我继续这样做,那么它将需要3个月才能完成。

python numpy multiprocess
1个回答
0
投票

你可以开始使用qazxsw poi打开每个文件作为qazxsw poi,然后选择列(让我们说pandasdf = pd.read_csv("level{}.csv".format(level)),...)并通过col1提取值矩阵

鉴于您的文件大小,我建议您使用col2处理每个文件并将矩阵保存为一个漂亮的格式。然后,您可以使用mat = df[["col1", "col2"]].values处理矩阵

© www.soinside.com 2019 - 2024. All rights reserved.