Warm tip: This article is reproduced from stackoverflow.com, please click
dask python stream dask-distributed streamz

Streamz/Dask: gather does not wait for all results of buffer

发布于 2020-04-15 09:57:46

Imports:

from dask.distributed import Client
import streamz
import time

Simulated workload:

def increment(x):
    time.sleep(0.5)
    return x + 1

Let's suppose I'd like to process some workload on a local Dask client:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).gather().sink(print)

        for i in range(10):
            ps.emit(i)

This works as expected, but sink(print) will, of course, enforce waiting for each result, thus the stream will not execute in parallel.

However, if I use buffer() to allow results to be cached, then gather() does not seem to correctly collect all results anymore and the interpreter exits before getting results. This approach:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).buffer(10).gather().sink(print)
                                     # ^
        for i in range(10):          # - allow parallel execution 
            ps.emit(i)               # - before gather()

...does not print any results for me. The Python interpreter just exits shortly after starting the script and before buffer() emits it's results, thus nothing gets printed.

However, if the main process is forced to wait for some time, the results are printed in parallel fashion (so they do not wait for each other, but are printed nearly simultaneously):

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).buffer(10).gather().sink(print)

        for i in range(10):
            ps.emit(i)

        time.sleep(10)  # <- force main process to wait while ps is working

Why is that? I thought gather() should wait for a batch of 10 results since buffer() should cache exactly 10 results in parallel before flushing them to gather(). Why does gather() not block in this case?

Is there a nice way to otherwise check if a Stream still contains elements being processed in order to prevent the main process from exiting prematurely?

Questioner
daniel451
Viewed
57
Thomas Moerman 2020-02-07 19:29
  1. "Why is that?": because the Dask distributed scheduler (which executes the stream mapper and sink functions) and your python script run in different processes. When the "with" block context ends, your Dask Client is closed and execution shuts down before the items emitted to the stream are able reach the sink function.

  2. "Is there a nice way to otherwise check if a Stream still contains elements being processed": not that I am aware of. However: if the behaviour you want is (I'm just guessing here) the parallel processing of a bunch of items, then Streamz is not what you should be using, vanilla Dask should suffice.