多处理:pool.imap_unordered;无法pickle功能

问题描述 投票:0回答:1

我正在处理 Lopez de Prado 所著书中的一个函数。这是我正在尝试做的一个最小的例子:

import numpy as np
import pandas as pd
import multiprocessing as mp

# Define test function
test_func = np.vectorize(lambda idx, a, b: [idx, a**1 + b**2])

test_df = pd.DataFrame(np.random.normal(size = (10, 2)), columns = ['a', 'b'])

test_df = test_df.reset_index()

func_dict = {col:test_df[col] for col in test_df.columns}

# Function used to evaluate test_func
def expandCall(kargs):

    # Get the function arguement
    func = kargs['func']

    # Delete it from the fictionary
    del kargs['func']

    # Evaluate function with other arguments
    out = func(**kargs)

    return out

parts = np.linspace(0, test_df.shape[0], 2).astype(int)

jobs = []

for i, j in zip(parts[:-1], parts[1:]):

    job = {key:func_dict[key][i:j] for key in func_dict}
    job.update({'func':test_func})
    jobs.append(job)
    
pool = mp.Pool(processes = 2)

outputs = pool.imap_unordered(expandCall, jobs)

out = []

# Process asynchronous output, report progress
for out_ in outputs:

    out.append(out_)

pool.close()
pool.join()

我收到错误

PicklingError:无法 pickle :在 --main-- 上查找属性失败

如果我正确阅读这本书,洛佩兹·德普拉多提供了一个解决方案:

def _unpickle_method(func_name, obj, cls):
    
    for cls in cls.mro():
        
        try:
            
            func = cls.__dict__[func_name]
            
        except KeyError:
            
            pass
        
        else:
            
            break
        
    return func.__get__(obj, cls)


def _pickle_method(method):
    
    func_name = method.im_func.__name__
    
    obj = method.im_self
    
    cls = method.im_class
    
    return _unpickle_method, (func_name, obj, cls)

import copyreg, types

copyreg.pickle(types.MethodType, _pickle_method, _unpickle_method)

但是,这对我不起作用。也许是由于我正在阅读的书出版后多处理模块发生了变化。不幸的是,我不太了解该解决方案,无法更新它。任何让我的最小示例运行的帮助将不胜感激。

pickle python-multiprocessing
1个回答
0
投票

您发布的方法允许您pickle类方法,lambda不是类方法,至少在python3中不是,并且标准python pickler不允许pickle lambda。

您可以使用 cloudpickle 来腌制 lambda,您必须执行

obj = cloudpickle.dumps(...)
然后将
obj
传递到池中,然后将
cloudpickle.loads(obj)
传递到另一侧。

© www.soinside.com 2019 - 2024. All rights reserved.