我有一个与LocalCluster
合作的管道:
from distributed import Client
client = Client()
list_of_queries = [...] # say 1_000 queries
loaded_data = client.map(sql_data_loader, list_of_queries)
processed_data = client.map(data_processor, loaded_data)
writer_results = client.map(data_writer, processed_data)
results = client.gather(writer_results)
一切正常,但并不像我期望的那样。
查看仪表板的状态页面,我看到这样的事情:
sql_data_loader 900 / 1000
data_processor 0 / 1000
data_writer 0 / 1000
即任务是按顺序执行的,而不是“并行”。因此,data_processor
在加载所有1000个查询之前不会开始执行。并且data_writer
等待'data_processor'完成所有期货的处理。
基于之前使用qaskxswpoi而不是dask.delayed
的dask经验预期的行为将是这样的:
client.map
这是一个错误的期望还是我缺少如何设置管道以确保行为类似于sql_data_loader 50 / 1000
data_processor 10 / 1000
data_writer 5 / 1000
?
如果你一个接一个地运行地图,那么一切都应该很好地管道。
两个预期目标之间存在一些紧张关系:
为了在这两个目标之间取得平衡,Dask根据呼叫之间的延迟来分配策略。如果两个映射调用紧接着发生,那么Dask假定它们是同一计算的一部分,但是如果它们相隔很长时间,则Dask假定它们是不同的计算,因此优先考虑先前的任务。您可以使用dask.delayed
关键字来控制它
fifo_timeout
这是相关的client.map(f, ..., fifo_timeout='10 minutes')
下面是一个示例,显示了将地图调用捆绑在一起时所需的行为:
documentation page