From 14d60147fabbd3646abea6a8b088ba261f7eeea7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Jan 2021 20:36:35 -0500 Subject: [PATCH] Add an example which breaks shielded proc waits --- examples/multiple_streams_one_portal.py | 45 +++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 examples/multiple_streams_one_portal.py diff --git a/examples/multiple_streams_one_portal.py b/examples/multiple_streams_one_portal.py new file mode 100644 index 0000000..354ec66 --- /dev/null +++ b/examples/multiple_streams_one_portal.py @@ -0,0 +1,45 @@ +import trio +import tractor + + +log = tractor.log.get_logger('multiportal') + + +async def stream_data(seed=10): + log.info("Starting stream task") + + for i in range(seed): + yield i + await trio.sleep(0) # trigger scheduler + + +async def stream_from_portal(p, consumed): + + async for item in await p.run(stream_data): + if item in consumed: + consumed.remove(item) + else: + consumed.append(item) + + +async def main(): + + async with tractor.open_nursery(loglevel='info') as an: + + p = await an.start_actor('stream_boi', enable_modules=[__name__]) + + consumed = [] + + async with trio.open_nursery() as n: + for i in range(2): + n.start_soon(stream_from_portal, p, consumed) + + # both streaming consumer tasks have completed and so we should + # have nothing in our list thanks to single threadedness + assert not consumed + + await an.cancel() + + +if __name__ == '__main__': + trio.run(main)