Python 多处理 - 共享大型数据集

问题描述 投票:0回答:1

我正在尝试加速受 CPU 限制的 Python 脚本(在 Windows11 上)。 Python 中的威胁似乎无法在不同的 cpu(核心)上运行,因此我唯一的选择是多处理。

我有一个大字典数据结构(从文件加载后占用 11GB 内存),我正在检查计算值是否在该字典中。计算的输入也来自文件(大小为 100GB)。我可以将此输入批量映射到进程,没有问题。但我无法将字典复制到所有进程,因为没有足够的内存。所以我需要找到一种方法让进程检查该值(实际上是一个字符串)是否在字典中。

有什么建议吗?

伪程序流程:

--main--
- load dictionary structure from file   # 11GB memory footprint
- ...
- While not all chuncks loaded
-    Load chunk of calcdata from file   # (10.000 lines per chunk)
-    Distribute (map) calcdata-chunck to processes
-    Wait for processes to complete all chunks

--process--
- for each element in subchunk
-    perform calculation
-    check if calculation in dictionary  # here is my problem!
-    store result in file

编辑,实施下面的评论后,我现在位于:

def ReadDictFromFile()
    cnt=0
    print("Reading dictionary from " + dictfilename)
    with open(dictfilename, encoding=("utf-8"), errors=("replace")) as f:
        next(f) #skip first line (header)
        for line in f:
            s = line.rstrip("\n")
            (key,keyvalue) = s.split()
            shared_dict[str(key)]=keyvalue
            cnt = cnt + 1
            if ((cnt % 1000000) == 0): #log each 1000000 where we are
                print(cnt)
                return #temp to speed up testing, not load whole dictionary atm
    print("Done loading dictionary")        


def checkqlist(qlist)
    print(str(os.getpid()) + "-" + str(len(qlist)))
    
    for li in qlist:
        try:
            checkvalue = calculations(li)
        
            (found, keyval) = InMem(checkvalue)
                
            if (found):
                print("FOUND!!! " + checkvalue + ' ' + keyvalue)            
        except Exception as e:
            print("(" + str(os.getpid()) + ")Error log: %s" % repr(e))
            time.sleep(15)


def InMem(checkvalue):
    if(checkvalue in shared_dict):
        return True, shared_dict[checkvalue]
    else:
        return False, ""


if __name__ == "__main__":
    start_time = time.time()

    global shared_dict 
    manager = Manager()
    shared_dict = manager.dict()

    ReadDictFromFile()

    chunksize=5
    nr_of_processes = 10
    with open(filetocheck, encoding=("utf-8"), errors=("replace")) as f:
        qlist = []
        for line in f:
            s = line.rstrip("\n")
            qlist.append(s)
            if (len(qlist) >= (chunksize * nr_of_processes)):
                chunked_list = [qlist[i:i+chunk_size] for i in range(0, len(qlist), chunk_size)]
                try:
                    with multiprocessing.Pool() as pool:
                        pool.map(checkqlist, chunked_list, nr_of_processes)          #problem: qlist is a single string, not a list of about 416 strings.  
                except Exception as e:
                    print("error log: %s" % repr(e))
                    time.sleep(15)
    logit("Completed! " + datetime.datetime.now().strftime("%I:%M%p on %B %d, %Y"))
    print("--- %s seconds ---" % (time.time() - start_time))
python python-3.x dictionary multiprocessing large-data
1个回答
2
投票

你可以使用 multiprocessing.Manager.dict 为此,它是你可以用来在 python 中的进程之间进行检查的最快的 IPC,并且对于内存大小,只需通过将所有值更改为 None 来使其更小,在我的pc 它每秒可以进行 33k 成员检查...比普通字典慢大约 400 倍。

manager = Manager()
shared_dict = manager.dict()
shared_dict.update({x:None for x in main_dictionary})
shared_dict["new_element"] = None  # to set another value
del shared_dict["new_element"]  # to delete a certain value

您还可以使用专用的内存数据库,例如 redis,它可以同时处理多个进程的轮询。

@Sam Mason 建议使用 WSL 和 fork 可能会更好,但这个是最便携的。

编辑:要将其存储在子全局范围中,您必须通过初始值设定项传递它。

def define_global(var):
    global shared_dict
    shared_dict = var
...
if __name__ == "__main__":
...

    with multiprocessing.Pool(initializer=define_global, initargs=(shared_dict ,)) as pool:

Pool.map
有参数(函数、可迭代、块大小),您可以将 chunksize 留空,这有一个很好的默认值,或者将其设置为
1
如果任务足够大,您可以 将其设置为任何其他除非你清楚地了解它的作用,否则它基本上是“每个工人的任务”

© www.soinside.com 2019 - 2024. All rights reserved.