多处理池故障排除:变量的序列化问题和全局访问困境

问题描述 投票:0回答:1

我面临多处理池和将变量传递给子进程的问题。


import vmaps_shortcutrun as sc
import ifcopenshell
import vmaps_functions as f
from multiprocessing import Pool

def oarun(oanr):

    oa = g_oalist[oanr]
    id = g_scrow['id']
    ifcfolderpathind = g_scrow['ifcfolderpathind']
    ifcind = f.oaextract(g_ifc,oa,ifcfolderpathind)

    print(oa)

    # do some actions which do not need a return

def poolrun(oalistlen):

    oanrlist = tuple(range(1,oalistlen))

    if __name__ == '__main__':
        with Pool() as pool:
            pool.map(oarun,oanrlist)


if __name__ == '__main__':

    # Do some stuff that can only run once and output a list of dictionaries
    sclist = sc.scrun()

    for scrow in sclist:

        global g_ifc
        global g_scrow
        global g_oalist

        g_scrow = scrow                             # result = dictionary of values
        g_ifc = ifcopenshell.open(scrow['file'])    # result = an object which can not be transferred with multiprocessing manager, starmap, partial ..... ->   pickle error
        g_oalist = f.objectassemblylist(g_ifc)      # result = an list of objects which can not be transferred with multiprocessing manager, starmap, partial ..... ->   pickle error

        poolrun(len(g_oalist))

我的简化代码存在以下问题:

g_ifc 和 g_oalist 无法作为变量传递到多处理池,因为它们无法被 pickle。我尝试使用Manager、starmap、partial等,但没有效果。 g_ifc = ifcopenshell.open(scrow['file']) 是一个非常消耗资源的过程,因此它应该只在 scrow 的每次迭代中发生一次。 仅传递文件路径不是一个选项,因为 oalist 可以有超过 3000 个元素,这会减少多处理的使用。 g_oalist需要在池之前初始化,因为它是池计数的基础。 作为替代方案,我尝试将变量放置为全局变量。

如果我将全局变量放在文件顶部并且仅直接将其作为 Python 脚本运行(没有 scrow 循环),则此方法有效。

我需要 if name == 'main': 在 scrow 循环之前,否则池将无限期地重新运行此循环。但是,使用此 if 语句会导致全局变量对于 oarun 函数不可用。

我已经尝试过以下方法:

  • 使用 Manager 传递变量:类型错误:无法 pickle 'SwigPyObject' 对象。
  • 使用部分传递变量:TypeError:无法pickle“SwigPyObject”对象。
  • 使用星图传递变量:类型错误:无法腌制“SwigPyObject”对象。
  • 在不同进程的 exec() 中运行整个代码的文本副本(如果 name == 'main':在原始进程中必需),导致全局变量不可用。

此时,唯一可行的方法是制作包含 scrow 循环每次迭代的代码的不同 Python 文件,并从 Windows 批处理文件运行它们。这样,Python 就会将其视为不同的进程。

import ifcopenshell
from multiprocessing import Pool

g_scrow = {'id': '1', 'rundate': '2024-04-10_16-25-48', 'projectnr': '', 'shortcut': 'S:/00. Projecten/TEST Snelkoppeling.lnk', 'file': 'S:/Snelkoppelingen/TEST.ifc', 'filedate': '2022-12-22_12-00-39', 'ifcfolderpathind': 'S:/Snelkoppelingen/1. Individueel', 'ifcfolderpath': 'S:/Snelkoppelingen/1. Individueel/1. ifc_2024-04-10_16-25-48_2022-12-22_12-00-39', 'csvdatafile': 'S:/Snelkoppelingen/1. ifc_2024-04-10_16-25-48_2022-12-22_12-00-39/vmaps_ifcdata.csv', 'csvpropfile': 'S:/Snelkoppelingen/1. ifc_2024-04-10_16-25-48_2022-12-22_12-00-39/vmaps_ifcprop.csv', 'type': 'latest', 'processed': '', 'thread': ''}
g_ifc = ifcopenshell.open(g_scrow['file'])
g_oalist = f.objectassemblylist(g_ifc)
oalistlen = len(g_oalist)

def oarun(oanr):

    oa = g_oalist[oanr]
    id = g_scrow['id']
    ifcfolderpathind = g_scrow['ifcfolderpathind']
    ifcind = f.oaextract(g_ifc,oa,ifcfolderpathind)

    print(oa)

    # do some actions which do not need a return

def poolrun(oalistlen):

    oanrlist = tuple(range(1,oalistlen))

    if __name__ == '__main__':
        with Pool() as pool:
            pool.map(oarun,oanrlist)

总之,我需要一个解决方案,允许并行处理对象(g_oalist),同时有效处理打开 g_ifc 的资源密集型操作,而不会遇到 pickling 错误。

python global-variables pickle python-multiprocessing ifc-open-shell
1个回答
0
投票

您可以通过使用初始化器使不可pickle对象作为全局可见:

from multiprocessing import Pool

def initialiser(gList_):
    global gList
    gList = gList_

def process(n: int):
    global gList
    print(gList[n])

def main():
    gList = list(range(10))
    with Pool(initializer=initialiser, initargs=(gList,)) as pool:
        for _ in pool.map_async(process, range(10)).get():
            pass


if __name__ == "__main__":
    main()

当然,在这种情况下 gList 可以被 pickle,但这只是演示了如何实现你的目标。

输出:

0
1
2
3
4
5
6
7
8
9
© www.soinside.com 2019 - 2024. All rights reserved.