我正在按照文档中提供的示例使用某些内容
import dask.bag
from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('worker-spec.yml')
cluster.adapt(minimum=0, maximum=24, interval="20000ms")
dag = dask.bag.from_sequence(tasks).map(lambda x: make_task(x).execute())
with distributed.Client(dask_cluster) as client:
results = dag.compute(scheduler=client)
cluster.close()
在我的情况下,execute()
函数执行大量的IO操作,大约需要5-10分钟才能运行。我想以某种方式配置KubeCluster
和dask调度程序,以使这些长时间运行的任务顺利进行的机会最大化。
我的问题分为两个部分。首先,如何覆盖distributed
配置设置?我想尝试类似的东西
dask.config.set({'scheduler.work-stealing': False})
但是我不知道在哪里设置这个合适的位置。具体来说,我不知道这是每个工作人员都应该意识到的事情,还是只有在实例化KubeCluster
的时候才可以指定的东西。
我的问题的第二部分与对长时间运行(超过几分钟)的任务的建议有关。我一直在尝试使用默认设置。有时一切正常,有时compute()
调用失败,但有以下异常:
<... omitting caller from the traceback ...> File "/usr/local/lib/python3.7/site-packages/dask/base.py", line 436, in compute results = schedule(dsk, keys, **kwargs) File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 2587, in get results = self.gather(packed, asynchronous=asynchronous, direct=direct) File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1885, in gather asynchronous=asynchronous, File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 767, in sync self.loop, func, *args, callback_timeout=callback_timeout, **kwargs File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 345, in sync raise exc.with_traceback(tb) File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 329, in f result[0] = yield future File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run value = future.result() File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1741, in _gather raise exception.with_traceback(traceback) distributed.scheduler.KilledWorker: ("('lambda-364defe33868bf6e4864da2933065a12', 3)", <Worker 'tcp://172.18.7.71:39029', name: 9, memory: 0, processing: 4>)
我正在从master分支运行最近的提交:
dask-kubernetes@git+git://github.com/dask/dask-kubernetes.git@add93d56ba1ac2f7d00576bd3f2d1be0db3e1757
。
编辑:
我更新了代码片段,以显示我正在将最小工作线程数设置为0的情况下调用adapt()
函数。我开始怀疑,如果工作线程数达到0,是否可能导致调度程序在返回compute()
结果之前关闭。
您可以通过设置configuration YAML files修改environment variables来覆盖设置。
因此,您可以更新~/.config/dask/distributed.yaml
文件。
distributed:
scheduler:
work-stealing: false
或通过设置环境变量。
export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=False
有时一切正常,有时compute()调用失败,但出现以下异常...AKilledWorker
异常的发生有多种原因。我们已将a documentation page纳入常见案例。
我最经常发现的原因是该任务使用了比可用内存更多的内存,并且被OOM杀手杀死。