编辑:解决方案我首先使用split
bash函数将我的文件拆分为10000行。然后:
with Pool(processes=32) as pool:
for level in range(75):
all_results=[]
for f in level_dir:
res = pool.apply_async(process_file, args=[f, level]
all_results.append(res)
for res in all_results:
res.get()
save_matrix()
我正在使用几个非常大的CSV文件(其中74个,10GB到65GB),并且需要逐行读取它们以从中提取数据并将其放入矩阵中。
我正在使用Python,因为使用C / C ++解析具有空字段和JSON字段的CSV文件并不容易。
我现在正在做的是我使用ThreadPool,但它看起来并不像它使用CPU到它的全部容量(Xeon E5),我认为这可能是因为矩阵填充。
M = np.zeros((users.size, levels.size, 2))
def process_line(row):
data = json.loads(row[3])
usr = data['usr']
#compute stuff
M[usr, level, 0] = score
M[usr, level, 1] = t_len
def main():
for level in range(75):
csv_f = open("level{}.csv".format(level))
reader = csv.reader(csv_f)
t = ThreadPool(processes=32)
for row in reader:
t.map(process_line, (row, level, ))
t.join()
t.close()
np.save("matrix.npy", M)
当我在每一行处理中打印时间戳时,看起来改变进程数并不会改变任何东西,它与不使用ThreadPool时一样慢。
我该怎么做才能让我的代码运行得更快?
如果我继续这样做,那么它将需要3个月才能完成。
你可以开始使用qazxsw poi打开每个文件作为qazxsw poi,然后选择列(让我们说pandas
,df = pd.read_csv("level{}.csv".format(level))
,...)并通过col1
提取值矩阵
鉴于您的文件大小,我建议您使用col2
处理每个文件并将矩阵保存为一个漂亮的格式。然后,您可以使用mat = df[["col1", "col2"]].values
处理矩阵