Python multiprocessing.Processgetting在尝试返回结果时生成AssertionError?

问题描述 投票:0回答:1

我正在尝试使用multiprocessing进行一项任务,这个任务在单个进程完成时非常慢。正如您在下面的代码中看到的,每个进程都应该返回一些结果(return_dict)。我最初使用10K行数据集(存储在docs.txt文件中的数据,大约70mb)测试了此代码,代码按预期运行。但是,当我使用脚本完成整个数据集(大约5.6gb)时,我得到了一个AssertionError,如我问题的底部所示。我想知道是否有人知道可能导致它的原因以及我如何能够避免它。谢谢。

from multiprocessing import Process, Manager
import os, io, numpy
from gensim.models.doc2vec import Doc2Vec

def worker(i, data, return_dict):
    model = Doc2Vec.load("D:\\Project1\\doc2vec_model_DM_20180814.model")
    results = numpy.zeros((len(data), model.vector_size))
    for id, doc in enumerate(data):
        results[id,:] = model.infer_vector(doc, alpha = 0.01, steps = 100)
    return_dict[i] = results

if __name__ == '__main__':
    import time
    a  = time.time()
    path = "D:\\Project1\\docs.txt"    # <<=== data stored in this file
    data = []
    manager = Manager()
    jobs = []
    return_dict = manager.dict()

    with io.open(path, "r+", encoding = "utf-8") as datafile:
        for id, row in enumerate(datafile):
            row = row.strip().split('\t')[0].split()
            data.append(row)

    step = numpy.floor(len(data)/20)
    intervals = numpy.arange(0, len(data), step = int(step)).tolist()
    intervals.append(len(data))

    for i in range(len(intervals) - 1):
        p = Process(target=worker, args=(i, data[intervals[i]:intervals[i+1]], return_dict))
        jobs.append(p)
        p.start()
    for proc in jobs:
        proc.join()

    results = numpy.zeros((len(data), 1000))
    start = 0
    end = 0
    for _, result in return_dict.items():    #<<===Where error happens
        end = end + result.shape[0]
        results[start:end,:] = result[:,:]
        start = end

    print(time.time() - a)

错误信息:

Traceback (most recent call last):
  File "D:\Project1\multiprocessing_test.py", line 43, in <module>
    for _, result in return_dict.items():
  File "<string>", line 2, in items
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\managers.py", line 757, in _callmethod
    kind, result = conn.recv()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 318, in _recv_bytes
    return self._get_more_data(ov, maxsize)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 337, in _get_more_data
    assert left > 0
AssertionError
python python-3.x multiprocessing python-multithreading
1个回答
0
投票

我想你正在使用所有可用的内存。 dict.items()创建了一个充满你所有物品并使用大量记忆的词典副本。最好只使用dict.iteritems()迭代你的结果。

编辑:对不起,我最初没有注意到python-3标签。在Python3中,dict.items()does不再返回副本,应该可以使用。

多处理中connection.py的相关代码是

left = _winapi.PeekNamedPipe(self._handle)[1]
assert left > 0

你在使用Windows吗?所以我猜这是一些与Windows相关的问题,似乎PeekNamedPipe返回0。

© www.soinside.com 2019 - 2024. All rights reserved.