diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 2cca0cb..d78cbdd 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -137,30 +137,11 @@ class ActorNursery: async def wait(self): """Wait for all subactors to complete. """ - async def wait_for_proc(proc, actor, portal, cancel_scope): - # TODO: timeout block here? - if proc.is_alive(): - await trio.hazmat.wait_readable(proc.sentinel) - # please god don't hang - proc.join() - log.debug(f"Joined {proc}") - self._children.pop(actor.uid) - # proc terminated, cancel result waiter - if cancel_scope: - log.warn( - f"Cancelling existing result waiter task for {actor.uid}") - cancel_scope.cancel() - - async def wait_for_result( - portal, actor, - task_status=trio.TASK_STATUS_IGNORED, - ): - # cancel the actor gracefully - with trio.open_cancel_scope() as cs: - task_status.started(cs) - log.info(f"Cancelling {portal.channel.uid} gracefully") - await portal.cancel_actor() - + async def maybe_consume_result(portal, actor): + if ( + portal in self._cancel_after_result_on_exit and + (portal._result is None and portal._exc is None) + ): log.debug(f"Waiting on final result from {subactor.uid}") res = await portal.result() # if it's an async-gen then we should alert the user @@ -173,6 +154,33 @@ class ActorNursery: async for item in agen: log.debug(f"Consuming item {item}") + async def wait_for_proc(proc, actor, portal, cancel_scope): + # TODO: timeout block here? + if proc.is_alive(): + await trio.hazmat.wait_readable(proc.sentinel) + # please god don't hang + proc.join() + log.debug(f"Joined {proc}") + await maybe_consume_result(portal, actor) + + self._children.pop(actor.uid) + # proc terminated, cancel result waiter + if cancel_scope: + log.warn( + f"Cancelling existing result waiter task for {actor.uid}") + cancel_scope.cancel() + + async def wait_for_actor( + portal, actor, + task_status=trio.TASK_STATUS_IGNORED, + ): + # cancel the actor gracefully + with trio.open_cancel_scope() as cs: + task_status.started(cs) + await maybe_consume_result(portal, actor) + log.info(f"Cancelling {portal.channel.uid} gracefully") + await portal.cancel_actor() + if cs.cancelled_caught: log.warn("Result waiter was cancelled") @@ -182,7 +190,7 @@ class ActorNursery: for subactor, proc, portal in children.values(): cs = None if portal in self._cancel_after_result_on_exit: - cs = await nursery.start(wait_for_result, portal, subactor) + cs = await nursery.start(wait_for_actor, portal, subactor) nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) async def cancel(self, hard_kill=False):