From 0b520c7beeefe7ddcf84d8b06d3fc03bdebb6e7a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 10 Mar 2019 22:13:21 -0400 Subject: [PATCH] Update streaming example in docs --- README.rst | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/README.rst b/README.rst index df6f223..74ba688 100644 --- a/README.rst +++ b/README.rst @@ -391,30 +391,32 @@ and print the results to your screen: send_chan, recv_chan = trio.open_memory_channel(500) - async def push_to_q(portal): - async for value in await portal.run( - __name__, 'stream_data', seed=seed - ): - # leverage trio's built-in backpressure - await send_chan.send(value) + async def push_to_chan(portal, send_chan): + async with send_chan: + async for value in await portal.run( + __name__, 'stream_data', seed=seed + ): + # leverage trio's built-in backpressure + await send_chan.send(value) - await send_chan.send(None) print(f"FINISHED ITERATING {portal.channel.uid}") # spawn 2 trio tasks to collect streams and push to a local queue async with trio.open_nursery() as n: + for portal in portals: - n.start_soon(push_to_q, portal) + n.start_soon(push_to_chan, portal, send_chan.clone()) + + # close this local task's reference to send side + await send_chan.aclose() unique_vals = set() - async for value in recv_chan: - if value not in unique_vals: - unique_vals.add(value) - # yield upwards to the spawning parent actor - yield value - - if value is None: - break + async with recv_chan: + async for value in recv_chan: + if value not in unique_vals: + unique_vals.add(value) + # yield upwards to the spawning parent actor + yield value assert value in unique_vals @@ -449,7 +451,7 @@ and print the results to your screen: print(f"STREAM TIME = {time.time() - start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") - assert result_stream == list(range(seed)) + [None] + assert result_stream == list(range(seed)) return result_stream