环境信息:
Python==3.7
Win7 i5-2310:2-cores-4-threads 4GB-memory
Dask Ini:
from dask.distributed import Client
Client()
# Client Scheduler: tcp://127.0.0.1:50723 Dashboard:
# http://127.0.0.1:8787/status Cluster Workers: 4 Cores: 4 Memory: 4.20 GB
[第一个:带For循环的延迟功能:
@dask.delayed
def teststr(x,y):
return x+y
rt=[]
for i in range(1000):
rt.append(teststr(str(i),str(i+1)) )
rt = dask.compute(*rt)
显示Wall time: 3.23 s
第二个:具有地图功能的基于列表的包:
import dask.bag as db
b=db.from_sequence([(str(i),str(i+1)) for i in range(1000)])
def teststr2(x,y):
return x+y
%%time
rt2=b.map(lambda x:teststr2(x[0],x[1])).compute()
它显示Wall time: 899 ms
forloop的时间比bag的时间长得多,而且磁盘IO花费很多(可能是虚拟内存)。
我没有检查如果更改dask.client configure(thread vs process),不同的任务(Integer function vs string funcion vs function without GIL vs database IO vs Disk IO)或更大的对象(与大型DataFrame或Numpy,同时执行一些数据分析工作。]
如何在dask中为python paralell作业选择更好的方法。尽管我得到了相同的输出,但有时需要花费太多时间进行计算。必须有一些规则可以帮助我。
我希望这可能是一个备忘单,可以告诉我基于System-Env,任务种类或其他内容组织代码的最佳方法。
两个选项之间的主要区别是任务数。您可以执行len(thing.dask)
快速查看计算给定的dask对象(延迟或bag)所需的图。
>>> rt2=b.map(lambda x:teststr2(x[0],x[1]))
>>> len(rt2.dask)
200
>>> rt=[]
>>> for i in range(1000):
... rt.append(teststr(str(i),str(i+1)) )
>>> sum(len(t.dask) for t in rt)
1000
因此,数据包中的任务数量减少了五倍,这是因为数据包在输入(也称为分区)上进行了批量计算。由于您请求的计算功能发生得如此之快,因此总时间完全取决于每个任务的开销以及将任务图发送到调度程序所花费的时间。