我已初始化集群,每个集群有10个工作线程,每个工作线程有4个线程,并且我有12台核心笔记本电脑在其中运行。
cluster = makeIndividualDashboard.LocalCluster(n_workers=10, threads_per_worker=4)
client = makeIndividualDashboard.Client()
runOna(client)
client.shutdown()
现在下面是我正在执行集群计算的代码。
st = settings.as_dict()
new_settings = namedtuple("Settings", st.keys())(*st.values())
to_process = []
client.cluster.scale(10)
if mongoConnection:
mongo_c = True
else:
mongo_c = None
future = client.scatter([net, new_settings, avgNodesConnected, kcoreByGroup, averageTeamDensity,
edgesInByAttributeTableMeans, edgesInByAttributeTable, crossTeamTiesTable,
descendentLookup, groupDegreeTable, respondentDegreeTable, degreeTable,
orgTeamTree, teamMembership, graphId, selectionRange, criteria,
onlyForNodes, hashIds, useEnvironment, rollupToLeaders, averageTeamSize,
meanCrossInTiesPct, meanCrossOutTiesPct, meanCrossAllTiesPct, mongo_c])
for node in nodes:
if FILTER_FOR_USER == None or node == FILTER_FOR_USER:
to_process.append(dask.delayed(run_me)(node, *future))
dask.compute(*to_process)
是的,这看起来有点混乱,因为run_me是一个非常大的函数,到目前为止,我将来可能无法更好地模块化。问题是,如果我只有5个或更少的工作程序,则此工作正常,但是一旦我增加工作程序的数量,这会给我序列化错误。
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
for key, value in data.items()
File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
if type(value) is Serialize
File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
TypeError: ('Could not serialize object of type float64.', '0.68')
distributed.comm.utils - ERROR - ('Could not serialize object of type float64.', '0.68')
再次,这很奇怪,因为如果我在具有35个核心的Linux服务器中运行此服务器,并且我将30个工人的工作正常,则不确定是什么问题。这是我当地人专用的吗?我可以寻找序列化问题,但为什么只能与5个工人一起使用?
非常感谢您的帮助。
错误表明您正在尝试将某些对象发送给无法序列化的工作程序。类型是float64
,可能是numpy.float64对象?我真的不知道你怎么说。我已验证Dask在Numpy float64对象周围移动就好了
In [1]: from dask.distributed import Client
In [2]: client = Client()
In [3]: import numpy as np
In [4]: x = np.float64(1)
In [5]: future = client.scatter(x)
In [6]: future.result()
Out[6]: 1.0
我鼓励您提供MCVE。参见https://stackoverflow.com/help/minimal-reproducible-example