我面临多处理池和将变量传递给子进程的问题。
import vmaps_shortcutrun as sc
import ifcopenshell
import vmaps_functions as f
from multiprocessing import Pool
def oarun(oanr):
oa = g_oalist[oanr]
id = g_scrow['id']
ifcfolderpathind = g_scrow['ifcfolderpathind']
ifcind = f.oaextract(g_ifc,oa,ifcfolderpathind)
print(oa)
# do some actions which do not need a return
def poolrun(oalistlen):
oanrlist = tuple(range(1,oalistlen))
if __name__ == '__main__':
with Pool() as pool:
pool.map(oarun,oanrlist)
if __name__ == '__main__':
# Do some stuff that can only run once and output a list of dictionaries
sclist = sc.scrun()
for scrow in sclist:
global g_ifc
global g_scrow
global g_oalist
g_scrow = scrow # result = dictionary of values
g_ifc = ifcopenshell.open(scrow['file']) # result = an object which can not be transferred with multiprocessing manager, starmap, partial ..... -> pickle error
g_oalist = f.objectassemblylist(g_ifc) # result = an list of objects which can not be transferred with multiprocessing manager, starmap, partial ..... -> pickle error
poolrun(len(g_oalist))
我的简化代码存在以下问题:
g_ifc 和 g_oalist 无法作为变量传递到多处理池,因为它们无法被 pickle。我尝试使用Manager、starmap、partial等,但没有效果。 g_ifc = ifcopenshell.open(scrow['file']) 是一个非常消耗资源的过程,因此它应该只在 scrow 的每次迭代中发生一次。 仅传递文件路径不是一个选项,因为 oalist 可以有超过 3000 个元素,这会减少多处理的使用。 g_oalist需要在池之前初始化,因为它是池计数的基础。 作为替代方案,我尝试将变量放置为全局变量。
如果我将全局变量放在文件顶部并且仅直接将其作为 Python 脚本运行(没有 scrow 循环),则此方法有效。
我需要 if name == 'main': 在 scrow 循环之前,否则池将无限期地重新运行此循环。但是,使用此 if 语句会导致全局变量对于 oarun 函数不可用。
我已经尝试过以下方法:
此时,唯一可行的方法是制作包含 scrow 循环每次迭代的代码的不同 Python 文件,并从 Windows 批处理文件运行它们。这样,Python 就会将其视为不同的进程。
import ifcopenshell
from multiprocessing import Pool
g_scrow = {'id': '1', 'rundate': '2024-04-10_16-25-48', 'projectnr': '', 'shortcut': 'S:/00. Projecten/TEST Snelkoppeling.lnk', 'file': 'S:/Snelkoppelingen/TEST.ifc', 'filedate': '2022-12-22_12-00-39', 'ifcfolderpathind': 'S:/Snelkoppelingen/1. Individueel', 'ifcfolderpath': 'S:/Snelkoppelingen/1. Individueel/1. ifc_2024-04-10_16-25-48_2022-12-22_12-00-39', 'csvdatafile': 'S:/Snelkoppelingen/1. ifc_2024-04-10_16-25-48_2022-12-22_12-00-39/vmaps_ifcdata.csv', 'csvpropfile': 'S:/Snelkoppelingen/1. ifc_2024-04-10_16-25-48_2022-12-22_12-00-39/vmaps_ifcprop.csv', 'type': 'latest', 'processed': '', 'thread': ''}
g_ifc = ifcopenshell.open(g_scrow['file'])
g_oalist = f.objectassemblylist(g_ifc)
oalistlen = len(g_oalist)
def oarun(oanr):
oa = g_oalist[oanr]
id = g_scrow['id']
ifcfolderpathind = g_scrow['ifcfolderpathind']
ifcind = f.oaextract(g_ifc,oa,ifcfolderpathind)
print(oa)
# do some actions which do not need a return
def poolrun(oalistlen):
oanrlist = tuple(range(1,oalistlen))
if __name__ == '__main__':
with Pool() as pool:
pool.map(oarun,oanrlist)
总之,我需要一个解决方案,允许并行处理对象(g_oalist),同时有效处理打开 g_ifc 的资源密集型操作,而不会遇到 pickling 错误。
您可以通过使用初始化器使不可pickle对象作为全局可见:
from multiprocessing import Pool
def initialiser(gList_):
global gList
gList = gList_
def process(n: int):
global gList
print(gList[n])
def main():
gList = list(range(10))
with Pool(initializer=initialiser, initargs=(gList,)) as pool:
for _ in pool.map_async(process, range(10)).get():
pass
if __name__ == "__main__":
main()
当然,在这种情况下 gList 可以被 pickle,但这只是演示了如何实现你的目标。
输出:
0
1
2
3
4
5
6
7
8
9