带有分布式调度程序的dask.delayed KeyError

问题描述 投票:0回答:1

我有一个用C编写并用interpolate_to_particles包裹的函数ctypes。我想使用dask.delayed对该函数进行一系列调用。

该代码成功运行,没有任何麻烦

# Interpolate w/o dask
result = interpolate_to_particles(arg1, arg2, arg3)

并且在single-threaded模式下具有分布式时间表

# Interpolate w/ dask
from dask.distributed import Client
client = Client()
result = dask.delayed(interpolate_to_particles)(arg1, arg2, arg3)
result_c = result.compute(scheduler='single-threaded')

但是如果我改打电话给

result_c = result.compute()

我收到以下KeyError:

> Traceback (most recent call last):   File
> "/path/to/lib/python3.6/site-packages/distributed/worker.py",
> line 3287, in dumps_function
>     result = cache_dumps[func]   File "/path/to/lib/python3.6/site-packages/distributed/utils.py",
> line 1518, in __getitem__
>     value = super().__getitem__(key)   File "/path/to/lib/python3.6/collections/__init__.py",
> line 991, in __getitem__
>     raise KeyError(key) KeyError: <function interpolate_to_particles at 0x1228ce510>

从快捷仪表板访问的工作日志不提供任何信息。实际上,除了启动之外,我没有看到任何信息表明工人做了什么。

关于可能发生的事情的任何想法,或者可以用来进一步调试的建议工具?谢谢!

ctypes dask keyerror dask-distributed dask-delayed
1个回答
0
投票

给出您的注释,听起来您的函数序列化不好。要对此进行测试,您可以尝试在一个进程中对函数进行酸洗,然后在另一个过程中对它进行解酸。

>>> import pickle
>>> print(pickle.dumps(interpolate_to_particles))
b'some bytes printed out here'

然后在另一个过程中

>>> import pickle
>>> interpolate_to_particles = pickle.loads(b'the same bytes you had before')

如果这不起作用,那么您会知道那是您的问题。我鼓励您查找“如何确保ctypes函数可序列化”或类似的内容,或者在Stack Overflow上问另一个较小范围的问题。

© www.soinside.com 2019 - 2024. All rights reserved.