forked from goodboy/tractor
1
0
Fork 0

Add a nursery "exited" signal

Use a `trio.Event` to enable nursery closure detection such that core
runtime tasks can be notified when a local nursery exits and allow
shutdown protocols to operate without close-before-terminate issues
(such as IPC channel closure during remote peer cancellation).
acked_backup
Tyler Goodlet 2021-12-01 22:05:23 -05:00
parent a23afb0bb8
commit d817f1a658
1 changed files with 16 additions and 9 deletions

View File

@ -52,6 +52,7 @@ class ActorNursery:
self.cancelled: bool = False self.cancelled: bool = False
self._join_procs = trio.Event() self._join_procs = trio.Event()
self.errors = errors self.errors = errors
self.exited = trio.Event()
async def start_actor( async def start_actor(
self, self,
@ -207,7 +208,8 @@ class ActorNursery:
# spawn cancel tasks for each sub-actor # spawn cancel tasks for each sub-actor
assert portal assert portal
nursery.start_soon(portal.cancel_actor) if portal.channel.connected():
nursery.start_soon(portal.cancel_actor)
# if we cancelled the cancel (we hung cancelling remote actors) # if we cancelled the cancel (we hung cancelling remote actors)
# then hard kill all sub-processes # then hard kill all sub-processes
@ -401,18 +403,23 @@ async def open_nursery(
async with open_root_actor(**kwargs) as actor: async with open_root_actor(**kwargs) as actor:
assert actor is current_actor() assert actor is current_actor()
# try: try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
finally:
anursery.exited.set()
else: # sub-nursery case
try:
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(
actor actor
) as anursery: ) as anursery:
yield anursery yield anursery
finally:
else: # sub-nursery case anursery.exited.set()
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
finally: finally:
log.debug("Nursery teardown complete") log.debug("Nursery teardown complete")