我的代码看起来像这样
def myfunc(param):
# expensive stuff that takes 2-3h
mylist = [...]
client = Client(...)
mgr = DeploymentMgr()
# ... setup stateful set ...
futures = client.map(myfunc, mylist, ..., resources={mgr.hash.upper(): 1})
client.gather(futures)
我有一个Kubernetes集群上运行的dask。在程序开始时,我创建了一个有状态的集合。这是通过 kubernetes.client.AppsV1Api()
. 然后我最多等待30分钟,直到我申请的所有工人都可用。在这个例子中,假设我申请了10个工人,但30分钟后,只有7个工人可用。最后,我呼叫 client.map()
并传递一个函数和一个列表给它。这个列表有10个元素。然而,dask 只会使用 7 个 Worker 来处理这个列表!即使几分钟后剩下的 3 个 Worker 可用,dask 也不会将任何列表元素分配给他们,即使没有处理第一个元素。即使过了几分钟,剩下的 3 个工作者可用,dask 也不会将任何列表元素分配给他们,即使第一个元素的处理都没有完成。
我怎样才能改变dask的这种行为?有没有办法告诉dask(或dask的调度器)定期检查新来的工人,并更 "正确 "地分配工作?或者我可以手动影响这些列表元素的分配?
谢谢你。
Dask一旦更好地了解任务所需时间,就会平衡负载。 你可以用配置值来估计任务长度。
distributed:
scheduler:
default-task-durations:
myfunc: 1hr
或者说,一旦Dask完成了其中一个任务,它就会知道未来如何围绕这个任务做出决策。
我相信这在GitHub问题跟踪器上也出现过几次。 你可能想通过搜索 https:/github.comdaskdistributedissues。 更多信息。