为什么我的Dask期货卡在'pending'中而永远不会结束?

问题描述 投票:1回答:1

我有一些长时间运行的代码(约5-10分钟的处理时间),我想以Dask Future的身份运行。这是一系列几个离散的步骤,我可以将其作为一个函数运行:

result : Future = client.submit(my_function, arg1, arg2)

或者我可以分成几个中间步骤:

# compose the result from the same intermediate results but with Futures
intermediate1 = client.submit(my_function1, arg1)
intermediate2 = client.submit(my_function2, arg1, arg2)
intermediate3 = client.submit(my_function3, intermediate2, arg1)
result = client.submit(my_function4, intermediate3)

如果我在本地运行此命令(例如result = my_function(arg1, arg2)),则完成。如果我将其提交给Dask,我会立即按预期将我的Future退还给我,但工作从未完成。此外,如果我将result.key用作跟踪作业状态的方式,后来又将未来重构为result = Future(key),则它[[总是的状态为pending

我想首先按原样运行它,这样我就可以将我的处理工作转移给我的Dask工作者,而不是处理请求的API,然后我希望能够开始在各个节点之间分配工作,提高性能。但是,为什么我的工作刚刚消失?查看我的Dask Scheduler Web界面,似乎没有显示作业。但是我知道Dask正在工作,因为我可以从Jupyter笔记本中向其提交代码。

我正在从Flask服务器中调用client.submit,并且正在返回密钥,以便以后可以使用。大致:

@app.route('/submit') def submit(): # ... future = client.submit(my_function, arg1, arg2) return jsonify({"key": future.key}) @app.route('/status/<key>') def status(key): future = Future(key) return jsonify({"status": future.status})

当我的应用程序部署到Kubernetes时,我的/submit路由会获得一个Future键,但是我的Dask状态页面没有显示任何处理任务。如果我在本地运行Flask,则确实会看到一个任务显示,并且在预期的延迟之后会显示我的作业

does

的输出;但是,当我用从/status/<key>返回的Future键命中自己的/submit路径时,它始终显示状态为pending
python dask dask-distributed dask-kubernetes
1个回答
0
投票
如果所有指向任务的期货都消失了,那么Dask可以随意忘记该任务。这使Dask可以清理工作,而不是让所有中间结果永远存在。

如果您要保留参考,则需要保留期货。这告诉Dask您仍然在意结果。您可以在烧瓶应用中通过创建字典在本地进行此操作。

futures = {} @app.route('/submit') def submit(): # ... future = client.submit(my_function, arg1, arg2) futures[future.key] = future return jsonify({"key": future.key}) @app.route('/status/<key>') def status(key): future = futures[key] return jsonify({"status": future.status})

但是您还需要考虑何时可以清理并释放这些期货。通过这种方法,您将慢慢填满您的内存。    
© www.soinside.com 2019 - 2024. All rights reserved.