Kubernetes和Dask和调度器

问题描述 投票:4回答:1

我的代码看起来像这样

def myfunc(param):
    # expensive stuff that takes 2-3h

mylist = [...]
client = Client(...)
mgr = DeploymentMgr()
# ... setup stateful set ...
futures = client.map(myfunc, mylist, ..., resources={mgr.hash.upper(): 1})
client.gather(futures)

我有一个Kubernetes集群上运行的dask。在程序开始时,我创建了一个有状态的集合。这是通过 kubernetes.client.AppsV1Api(). 然后我最多等待30分钟,直到我申请的所有工人都可用。在这个例子中,假设我申请了10个工人,但30分钟后,只有7个工人可用。最后,我呼叫 client.map() 并传递一个函数和一个列表给它。这个列表有10个元素。然而,dask 只会使用 7 个 Worker 来处理这个列表!即使几分钟后剩下的 3 个 Worker 可用,dask 也不会将任何列表元素分配给他们,即使没有处理第一个元素。即使过了几分钟,剩下的 3 个工作者可用,dask 也不会将任何列表元素分配给他们,即使第一个元素的处理都没有完成。

我怎样才能改变dask的这种行为?有没有办法告诉dask(或dask的调度器)定期检查新来的工人,并更 "正确 "地分配工作?或者我可以手动影响这些列表元素的分配?

谢谢你。

python kubernetes dask
1个回答
4
投票

Dask一旦更好地了解任务所需时间,就会平衡负载。 你可以用配置值来估计任务长度。

distributed:
  scheduler:
    default-task-durations:
      myfunc: 1hr

或者说,一旦Dask完成了其中一个任务,它就会知道未来如何围绕这个任务做出决策。

我相信这在GitHub问题跟踪器上也出现过几次。 你可能想通过搜索 https:/github.comdaskdistributedissues。 更多信息。

© www.soinside.com 2019 - 2024. All rights reserved.