我正在尝试使用multiprocessing
进行一项任务,这个任务在单个进程完成时非常慢。正如您在下面的代码中看到的,每个进程都应该返回一些结果(return_dict
)。我最初使用10K行数据集(存储在docs.txt
文件中的数据,大约70mb)测试了此代码,代码按预期运行。但是,当我使用脚本完成整个数据集(大约5.6gb)时,我得到了一个AssertionError
,如我问题的底部所示。我想知道是否有人知道可能导致它的原因以及我如何能够避免它。谢谢。
from multiprocessing import Process, Manager
import os, io, numpy
from gensim.models.doc2vec import Doc2Vec
def worker(i, data, return_dict):
model = Doc2Vec.load("D:\\Project1\\doc2vec_model_DM_20180814.model")
results = numpy.zeros((len(data), model.vector_size))
for id, doc in enumerate(data):
results[id,:] = model.infer_vector(doc, alpha = 0.01, steps = 100)
return_dict[i] = results
if __name__ == '__main__':
import time
a = time.time()
path = "D:\\Project1\\docs.txt" # <<=== data stored in this file
data = []
manager = Manager()
jobs = []
return_dict = manager.dict()
with io.open(path, "r+", encoding = "utf-8") as datafile:
for id, row in enumerate(datafile):
row = row.strip().split('\t')[0].split()
data.append(row)
step = numpy.floor(len(data)/20)
intervals = numpy.arange(0, len(data), step = int(step)).tolist()
intervals.append(len(data))
for i in range(len(intervals) - 1):
p = Process(target=worker, args=(i, data[intervals[i]:intervals[i+1]], return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
results = numpy.zeros((len(data), 1000))
start = 0
end = 0
for _, result in return_dict.items(): #<<===Where error happens
end = end + result.shape[0]
results[start:end,:] = result[:,:]
start = end
print(time.time() - a)
错误信息:
Traceback (most recent call last):
File "D:\Project1\multiprocessing_test.py", line 43, in <module>
for _, result in return_dict.items():
File "<string>", line 2, in items
File "C:\ProgramData\Anaconda3\lib\multiprocessing\managers.py", line 757, in _callmethod
kind, result = conn.recv()
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 250, in recv
buf = self._recv_bytes()
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 318, in _recv_bytes
return self._get_more_data(ov, maxsize)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 337, in _get_more_data
assert left > 0
AssertionError
我想你正在使用所有可用的内存。 dict.items()
创建了一个充满你所有物品并使用大量记忆的词典副本。最好只使用dict.iteritems()
迭代你的结果。
编辑:对不起,我最初没有注意到python-3标签。在Python3中,dict.items()
does不再返回副本,应该可以使用。
多处理中connection.py的相关代码是
left = _winapi.PeekNamedPipe(self._handle)[1]
assert left > 0
你在使用Windows吗?所以我猜这是一些与Windows相关的问题,似乎PeekNamedPipe返回0。