使用dask-distributed如何从队列提供的长时间运行的任务中生成未来

问题描述 投票:3回答:1

我正在使用这个示例http://matthewrocklin.com/blog/work/2017/02/11/dask-tensorflow的磁盘分布式长时间运行任务,其中长时间运行的工作任务从tensorflow示例中的队列获取其输入,并将其结果传递到输出队列。 (我没有看到最新版本的dask中的示例中使用的频道)。

我可以看到如何分散列表并应用映射来生成将输入数据推送到工作者输入队列的期货列表。

def transfer_dask_to_worker(batch):
    worker = get_worker()
    worker.tensorflow_queue.put(batch)

data = [1,2,3,4] 

future_data = e.scatter(data)

tasks = e.map(transfer_dask_to_worker, future_data ,
     workers=dask_spec['worker'], pure=False)

现在,如果我们等待worker消耗任务,则所有结果都将在worker的输出队列中。我们可以将它全部用完

def transfer_worker_to_dask(arg):
    worker = get_worker()
    return worker.output_queue.get()

results = e.map(transfer_worker_to_dask,range(len(tasks)))

只要我们通过在调用它们之前等待所有工作任务完成来手动处理排序,这样就可以正常工作。

我们如何将输出期货与输入的下游联系起来?有没有办法让长时间运行的任务在工人上创建可以收回调度程序任务的未来?

我尝试让transfer_dask_to_worker(批处理)也查询输出队列并返回结果:

def transfer_dask_to_worker_and_return(batch):
    worker = get_worker()
    worker.tensorflow_queue.put(batch)
    return worker.output_queue.get()

这适用于短名单,但开始失败,取消了1000个项目的未来。

提前致谢。

dask dask-distributed
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.