forked from goodboy/tractor
1
0
Fork 0

Merge pull request #188 from goodboy/we_aint_got_zombie_shields

We aint got zombie shields
sync_breakpoint
goodboy 2021-01-18 11:01:14 -05:00 committed by GitHub
commit d8b6c0093c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 2 deletions

View File

@ -0,0 +1,45 @@
import trio
import tractor
log = tractor.log.get_logger('multiportal')
async def stream_data(seed=10):
log.info("Starting stream task")
for i in range(seed):
yield i
await trio.sleep(0) # trigger scheduler
async def stream_from_portal(p, consumed):
async for item in await p.run(stream_data):
if item in consumed:
consumed.remove(item)
else:
consumed.append(item)
async def main():
async with tractor.open_nursery(loglevel='info') as an:
p = await an.start_actor('stream_boi', enable_modules=[__name__])
consumed = []
async with trio.open_nursery() as n:
for i in range(2):
n.start_soon(stream_from_portal, p, consumed)
# both streaming consumer tasks have completed and so we should
# have nothing in our list thanks to single threadedness
assert not consumed
await an.cancel()
if __name__ == '__main__':
trio.run(main)

View File

@ -273,10 +273,14 @@ async def new_proc(
# ``trio.Process.__aexit__()`` (it tears down stdio # ``trio.Process.__aexit__()`` (it tears down stdio
# which will kill any waiting remote pdb trace). # which will kill any waiting remote pdb trace).
# TODO: No idea how we can enforce zombie
# reaping more stringently without the shield
# we used to have below...
# always "hard" join sub procs: # always "hard" join sub procs:
# no actor zombies allowed # no actor zombies allowed
with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
await proc.wait() await proc.wait()
else: else:
# `multiprocessing` # `multiprocessing`
assert _ctx assert _ctx