我正在处理 Lopez de Prado 所著书中的一个函数。这是我正在尝试做的一个最小的例子:
import numpy as np
import pandas as pd
import multiprocessing as mp
# Define test function
test_func = np.vectorize(lambda idx, a, b: [idx, a**1 + b**2])
test_df = pd.DataFrame(np.random.normal(size = (10, 2)), columns = ['a', 'b'])
test_df = test_df.reset_index()
func_dict = {col:test_df[col] for col in test_df.columns}
# Function used to evaluate test_func
def expandCall(kargs):
# Get the function arguement
func = kargs['func']
# Delete it from the fictionary
del kargs['func']
# Evaluate function with other arguments
out = func(**kargs)
return out
parts = np.linspace(0, test_df.shape[0], 2).astype(int)
jobs = []
for i, j in zip(parts[:-1], parts[1:]):
job = {key:func_dict[key][i:j] for key in func_dict}
job.update({'func':test_func})
jobs.append(job)
pool = mp.Pool(processes = 2)
outputs = pool.imap_unordered(expandCall, jobs)
out = []
# Process asynchronous output, report progress
for out_ in outputs:
out.append(out_)
pool.close()
pool.join()
我收到错误
PicklingError:无法 pickle
:在 --main-- 上查找属性失败
如果我正确阅读这本书,洛佩兹·德普拉多提供了一个解决方案:
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
import copyreg, types
copyreg.pickle(types.MethodType, _pickle_method, _unpickle_method)
但是,这对我不起作用。也许是由于我正在阅读的书出版后多处理模块发生了变化。不幸的是,我不太了解该解决方案,无法更新它。任何让我的最小示例运行的帮助将不胜感激。
您发布的方法允许您pickle类方法,lambda不是类方法,至少在python3中不是,并且标准python pickler不允许pickle lambda。
您可以使用 cloudpickle 来腌制 lambda,您必须执行
obj = cloudpickle.dumps(...)
然后将 obj
传递到池中,然后将 cloudpickle.loads(obj)
传递到另一侧。