进口:
from dask.distributed import Client
import streamz
import time
模拟的工作量:
def increment(x):
time.sleep(0.5)
return x + 1
假设我想在本地Dask客户端上处理一些工作负载:
if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).gather().sink(print)
for i in range(10):
ps.emit(i)
这将按预期工作,但是sink(print)
当然会强制执行等待每个结果,因此流将不会并行执行。
但是,如果我使用buffer()
允许结果被缓存,则gather()
似乎不再正确地收集所有结果,并且解释器在获取结果之前退出。这种方法:
if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).buffer(10).gather().sink(print)
# ^
for i in range(10): # - allow parallel execution
ps.emit(i) # - before gather()
... 不显示任何结果对我来说。 Python解释器仅在启动脚本后不久退出,并且before buffer()
发出其结果,因此打印了[[nothing。
if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).buffer(10).gather().sink(print)
for i in range(10):
ps.emit(i)
time.sleep(10) # <- force main process to wait while ps is working
为什么?我认为?导入:从dask.distributed import客户端导入streamz导入时间模拟工作量:def增量(x):time.sleep(0.5)返回x + 1让我们假设我想处理一些工作量...gather()
应该等待一批10个结果,因为buffer()
应该在将它们刷新到gather()
之前准确地并行缓存10个结果。为什么gather()
在这种情况下不阻塞?为了防止主进程过早退出,是否有一种不错的方法
检查Stream是否仍然包含正在处理的元素