From 14d60147fabbd3646abea6a8b088ba261f7eeea7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Jan 2021 20:36:35 -0500 Subject: [PATCH 1/2] Add an example which breaks shielded proc waits --- examples/multiple_streams_one_portal.py | 45 +++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 examples/multiple_streams_one_portal.py diff --git a/examples/multiple_streams_one_portal.py b/examples/multiple_streams_one_portal.py new file mode 100644 index 0000000..354ec66 --- /dev/null +++ b/examples/multiple_streams_one_portal.py @@ -0,0 +1,45 @@ +import trio +import tractor + + +log = tractor.log.get_logger('multiportal') + + +async def stream_data(seed=10): + log.info("Starting stream task") + + for i in range(seed): + yield i + await trio.sleep(0) # trigger scheduler + + +async def stream_from_portal(p, consumed): + + async for item in await p.run(stream_data): + if item in consumed: + consumed.remove(item) + else: + consumed.append(item) + + +async def main(): + + async with tractor.open_nursery(loglevel='info') as an: + + p = await an.start_actor('stream_boi', enable_modules=[__name__]) + + consumed = [] + + async with trio.open_nursery() as n: + for i in range(2): + n.start_soon(stream_from_portal, p, consumed) + + # both streaming consumer tasks have completed and so we should + # have nothing in our list thanks to single threadedness + assert not consumed + + await an.cancel() + + +if __name__ == '__main__': + trio.run(main) From 9f4e497b9ce1054655ef6062f81f662402fb62b9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Jan 2021 20:36:57 -0500 Subject: [PATCH 2/2] Don't shield proc waits --- tractor/_spawn.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 31a414c..f8528ac 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -273,10 +273,14 @@ async def new_proc( # ``trio.Process.__aexit__()`` (it tears down stdio # which will kill any waiting remote pdb trace). + # TODO: No idea how we can enforce zombie + # reaping more stringently without the shield + # we used to have below... + # always "hard" join sub procs: # no actor zombies allowed - with trio.CancelScope(shield=True): - await proc.wait() + # with trio.CancelScope(shield=True): + await proc.wait() else: # `multiprocessing` assert _ctx