我有一个 Dask 工作集群,我想用它们来使用复杂模型并行化预测操作。模型文件很大并且需要时间来加载,因此我使用 client.run 让所有工作人员运行初始化函数来加载该模型。
如何从我的
client.run
函数中保留 Python 变量状态,以便我可以在将来的任务操作中引用并使用它?
我找到了
dask.distributed.get_worker
和 worker.data
字典,并使用它来设置任意值,然后我可以在 map_partition
函数中访问,但不确定这是否是最好或最安全的选择。
如果一个worker死亡并重新启动,或者如果其他worker加入集群,有没有办法让这些worker自动调用我最初传递给的相同函数
client.run
?
如果你的模型/状态不变,那么我可能只会使用
client.scatter
将其发送出去,并让 Dask 根据需要复制它。这是最简单的方法,也是最稳健的方法。如果有新工人到达,它会根据需要复制它。
但是,如果您想自己管理状态,那么是的,运行一个函数,获取一些状态,并将其附加到工作线程是一个好主意:
get_worker().my_special_state = x
我不建议将数据放入
get_worker().data
,因为这是 Dask 管理自己内存的地方。当它看到它不知道的异物时,它可能会感到困惑。事情应该会好起来,但你永远不知道。
如果一个工作线程死亡并重新启动,或者如果其他工作线程加入集群,有没有办法让这些工作线程自动调用我最初传递给 client.run 的相同函数?
是的,这里最简单的方法是使用预加载脚本或工作插件。请参阅https://docs.dask.org/en/latest/customize-initialization.html