Streamz / Dask:收集不等待缓冲区的所有结果

问题描述 投票:4回答:1

进口:

from dask.distributed import Client
import streamz
import time

模拟的工作量:

def increment(x):
    time.sleep(0.5)
    return x + 1

假设我想在本地Dask客户端上处理一些工作负载:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).gather().sink(print)

        for i in range(10):
            ps.emit(i)

这将按预期工作,但是sink(print)当然会强制执行等待每个结果,因此流将不会并行执行。

但是,如果我使用buffer()允许结果被缓存,则gather()似乎不再正确地收集所有结果,并且解释器在获取结果之前退出。这种方法:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).buffer(10).gather().sink(print)
                                     # ^
        for i in range(10):          # - allow parallel execution 
            ps.emit(i)               # - before gather()

... 不显示任何结果对我来说。 Python解释器仅在启动脚本后不久退出,并且before buffer()发出其结果,因此打印了[[nothing。

但是,如果主进程被迫等待一段时间,结果将以并行方式打印(因此,它们不会彼此等待,而是几乎同时打印):

if __name__ == "__main__": with Client() as dask_client: ps = streamz.Stream() ps.scatter().map(increment).buffer(10).gather().sink(print) for i in range(10): ps.emit(i) time.sleep(10) # <- force main process to wait while ps is working

为什么?我认为gather()应该等待一批10个结果,因为buffer()应该在将它们刷新到gather()之前准确地并行缓存10个结果。为什么gather()在这种情况下不阻塞?

为了防止主进程过早退出,是否有一种不错的方法

检查Stream是否仍然包含正在处理的元素

导入:从dask.distributed import客户端导入streamz导入时间模拟工作量:def增量(x):time.sleep(0.5)返回x + 1让我们假设我想处理一些工作量...
python stream dask dask-distributed streamz
1个回答
1
投票
  1. “为什么?”:因为Dask分布式调度程序(执行流映射器和接收器功能)和python脚本在不同的进程中运行。当“ with”块上下文结束时,在发送到流的项目能够到达接收器功能之前,Dask Client将关闭并且执行将关闭。
© www.soinside.com 2019 - 2024. All rights reserved.