我有一些长时间运行的代码(约5-10分钟的处理时间),我想以Dask Future
的身份运行。这是一系列几个离散的步骤,我可以将其作为一个函数运行:
result : Future = client.submit(my_function, arg1, arg2)
或者我可以分成几个中间步骤:
# compose the result from the same intermediate results but with Futures
intermediate1 = client.submit(my_function1, arg1)
intermediate2 = client.submit(my_function2, arg1, arg2)
intermediate3 = client.submit(my_function3, intermediate2, arg1)
result = client.submit(my_function4, intermediate3)
如果我在本地运行此命令(例如result = my_function(arg1, arg2)
),则完成。如果我将其提交给Dask,我会立即按预期将我的Future
退还给我,但工作从未完成。此外,如果我将result.key
用作跟踪作业状态的方式,后来又将未来重构为result = Future(key)
,则它[[总是的状态为pending
。
我正在从Flask服务器中调用client.submit
,并且正在返回密钥,以便以后可以使用。大致:
@app.route('/submit')
def submit():
# ...
future = client.submit(my_function, arg1, arg2)
return jsonify({"key": future.key})
@app.route('/status/<key>')
def status(key):
future = Future(key)
return jsonify({"status": future.status})
当我的应用程序部署到Kubernetes时,我的的输出;但是,当我用从/submit
路由会获得一个Future键,但是我的Dask状态页面没有显示任何处理任务。如果我在本地运行Flask,则确实会看到一个任务显示,并且在预期的延迟之后会显示我的作业does
/status/<key>
返回的Future键命中自己的/submit
路径时,它始终显示状态为pending。 如果您要保留参考,则需要保留期货。这告诉Dask您仍然在意结果。您可以在烧瓶应用中通过创建字典在本地进行此操作。
futures = {}
@app.route('/submit')
def submit():
# ...
future = client.submit(my_function, arg1, arg2)
futures[future.key] = future
return jsonify({"key": future.key})
@app.route('/status/<key>')
def status(key):
future = futures[key]
return jsonify({"status": future.status})
但是您还需要考虑何时可以清理并释放这些期货。通过这种方法,您将慢慢填满您的内存。