温馨提示:本文翻译自stackoverflow.com,查看原文请点击:python - Streamz/Dask: gather does not wait for all results of buffer
dask python stream dask-distributed streamz

python - Streamz / Dask:收集不等待缓冲区的所有结果

发布于 2020-04-15 10:25:21

进口:

from dask.distributed import Client
import streamz
import time

模拟的工作量:

def increment(x):
    time.sleep(0.5)
    return x + 1

假设我想在本地Dask客户端上处理一些工作负载:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).gather().sink(print)

        for i in range(10):
            ps.emit(i)

这可以按预期工作,但是sink(print)当然会强制等待每个结果,因此流将不会并行执行。

但是,如果我buffer()用来允许结果被缓存,那么gather()似乎不再正确地收集所有结果,并且解释器在获得结果之前退出。这种方法:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).buffer(10).gather().sink(print)
                                     # ^
        for i in range(10):          # - allow parallel execution 
            ps.emit(i)               # - before gather()

... 不会为我打印任何结果Python解释器只是在启动脚本后不久且 buffer()发出结果之前不久退出,因此不会打印任何内容

但是,如果主进程被迫等待一段时间,则结果以并行方式打印(因此它们不会彼此等待,而是几乎同时打印):

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).buffer(10).gather().sink(print)

        for i in range(10):
            ps.emit(i)

        time.sleep(10)  # <- force main process to wait while ps is working

这是为什么?我认为gather()应该等待一批10个结果,因为buffer()应该在将它们刷新到之前并行缓存正好10个结果gather()为什么gather()在这种情况下不阻止?

是否有一种很好的方法来检查Stream是否仍然包含正在处理的元素,以防止主进程过早退出?

查看更多

提问者
daniel451
被浏览
58
Thomas Moerman 2020-02-07 19:29
  1. “为什么?”:因为Dask分布式调度程序(执行流映射器和接收器功能)和python脚本在不同的进程中运行。当“ with”块上下文结束时,将关闭Dask Client,并关闭执行,然后再将发射到流中的项目传递到接收器功能。

  2. “是否有一种很好的方法来检查Stream是否仍然包含正在处理的元素”:我不知道。但是:如果您想要的行为是(我只是在这里猜测)一堆项目的并行处理,那么Streamz不是您应该使用的对象, Vanilla Dask就足够了。