diff --git a/README.rst b/README.rst index df6f223..74ba688 100644 --- a/README.rst +++ b/README.rst @@ -391,30 +391,32 @@ and print the results to your screen: send_chan, recv_chan = trio.open_memory_channel(500) - async def push_to_q(portal): - async for value in await portal.run( - __name__, 'stream_data', seed=seed - ): - # leverage trio's built-in backpressure - await send_chan.send(value) + async def push_to_chan(portal, send_chan): + async with send_chan: + async for value in await portal.run( + __name__, 'stream_data', seed=seed + ): + # leverage trio's built-in backpressure + await send_chan.send(value) - await send_chan.send(None) print(f"FINISHED ITERATING {portal.channel.uid}") # spawn 2 trio tasks to collect streams and push to a local queue async with trio.open_nursery() as n: + for portal in portals: - n.start_soon(push_to_q, portal) + n.start_soon(push_to_chan, portal, send_chan.clone()) + + # close this local task's reference to send side + await send_chan.aclose() unique_vals = set() - async for value in recv_chan: - if value not in unique_vals: - unique_vals.add(value) - # yield upwards to the spawning parent actor - yield value - - if value is None: - break + async with recv_chan: + async for value in recv_chan: + if value not in unique_vals: + unique_vals.add(value) + # yield upwards to the spawning parent actor + yield value assert value in unique_vals @@ -449,7 +451,7 @@ and print the results to your screen: print(f"STREAM TIME = {time.time() - start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") - assert result_stream == list(range(seed)) + [None] + assert result_stream == list(range(seed)) return result_stream