Dask分布式库,给出序列化错误

问题描述 投票:0回答:1

我已初始化集群,每个集群有10个工作线程,每个工作线程有4个线程,并且我有12台核心笔记本电脑在其中运行。

    cluster = makeIndividualDashboard.LocalCluster(n_workers=10, threads_per_worker=4)
    client = makeIndividualDashboard.Client()
    runOna(client)
    client.shutdown()

现在下面是我正在执行集群计算的代码。

    st = settings.as_dict()
    new_settings = namedtuple("Settings", st.keys())(*st.values())
    to_process = []
    client.cluster.scale(10)
    if mongoConnection:
        mongo_c = True
    else:
        mongo_c = None
    future = client.scatter([net, new_settings, avgNodesConnected, kcoreByGroup, averageTeamDensity,
                             edgesInByAttributeTableMeans, edgesInByAttributeTable, crossTeamTiesTable,
                             descendentLookup, groupDegreeTable, respondentDegreeTable, degreeTable,
                             orgTeamTree, teamMembership, graphId, selectionRange, criteria,
                             onlyForNodes, hashIds, useEnvironment, rollupToLeaders, averageTeamSize,
                             meanCrossInTiesPct, meanCrossOutTiesPct, meanCrossAllTiesPct, mongo_c])
    for node in nodes:
        if FILTER_FOR_USER == None or node == FILTER_FOR_USER:
            to_process.append(dask.delayed(run_me)(node, *future))

    dask.compute(*to_process)

是的,这看起来有点混乱,因为run_me是一个非常大的函数,到目前为止,我将来可能无法更好地模块化。问题是,如果我只有5个或更少的工作程序,则此工作正常,但是一旦我增加工作程序的数量,这会给我序列化错误。

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
    for key, value in data.items()
  File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
    if type(value) is Serialize
  File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
TypeError: ('Could not serialize object of type float64.', '0.68')
distributed.comm.utils - ERROR - ('Could not serialize object of type float64.', '0.68')

再次,这很奇怪,因为如果我在具有35个核心的Linux服务器中运行此服务器,并且我将30个工人的工作正常,则不确定是什么问题。这是我当地人专用的吗?我可以寻找序列化问题,但为什么只能与5个工人一起使用?

非常感谢您的帮助。

python-3.x multiprocessing dask dask-distributed dask-delayed
1个回答
0
投票

错误表明您正在尝试将某些对象发送给无法序列化的工作程序。类型是float64,可能是numpy.float64对象?我真的不知道你怎么说。我已验证Dask在Numpy float64对象周围移动就好了

In [1]: from dask.distributed import Client                                                                                                                                                                                         

In [2]: client = Client()                                                                                                                                                                                                           

In [3]: import numpy as np                                                                                                                                                                                                          

In [4]: x = np.float64(1)                                                                                                                                                                                                           

In [5]: future = client.scatter(x)                                                                                                                                                                                                  

In [6]: future.result()                                                                                                                                                                                                             
Out[6]: 1.0

我鼓励您提供MCVE。参见https://stackoverflow.com/help/minimal-reproducible-example

© www.soinside.com 2019 - 2024. All rights reserved.