Dask分布式按顺序执行任务

问题描述 投票:1回答:1

我有一个与LocalCluster合作的管道:

from distributed import Client
client = Client()

list_of_queries = [...]  # say 1_000 queries

loaded_data = client.map(sql_data_loader, list_of_queries)

processed_data = client.map(data_processor, loaded_data)

writer_results = client.map(data_writer, processed_data)

results = client.gather(writer_results)

一切正常,但并不像我期望的那样。

查看仪表板的状态页面,我看到这样的事情:

sql_data_loader             900 / 1000
data_processor                0 / 1000
data_writer                   0 / 1000

即任务是按顺序执行的,而不是“并行”。因此,data_processor在加载所有1000个查询之前不会开始执行。并且data_writer等待'data_processor'完成所有期货的处理。

基于之前使用qaskxswpoi而不是dask.delayed的dask经验预期的行为将是这样的:

client.map

这是一个错误的期望还是我缺少如何设置管道以确保行为类似于sql_data_loader 50 / 1000 data_processor 10 / 1000 data_writer 5 / 1000

dask dask-distributed
1个回答
1
投票

如果你一个接一个地运行地图,那么一切都应该很好地管道。

两个预期目标之间存在一些紧张关系:

  1. 任务应该管道,如你所愿
  2. 首先提交的任务应具有更高的优先级

为了在这两个目标之间取得平衡,Dask根据呼叫之间的延迟来分配策略。如果两个映射调用紧接着发生,那么Dask假定它们是同一计算的一部分,但是如果它们相隔很长时间,则Dask假定它们是不同的计算,因此优先考虑先前的任务。您可以使用dask.delayed关键字来控制它

fifo_timeout

这是相关的client.map(f, ..., fifo_timeout='10 minutes')

下面是一个示例,显示了将地图调用捆绑在一起时所需的行为:

documentation page
© www.soinside.com 2019 - 2024. All rights reserved.