diff --git a/tractor/_portal.py b/tractor/_portal.py index d9ee957..04fa0a0 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -148,7 +148,9 @@ class Portal: """Return the result(s) from the remote actor's "main" task. """ if self._expect_result is None: - raise RuntimeError("This portal is not expecting a final result?") + raise RuntimeError( + f"Portal for {self.channel.uid} is not expecting a final" + "result?") elif self._result is None: self._result = await self._return_from_resptype( *self._expect_result diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 569f731..2cca0cb 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -126,18 +126,18 @@ class ActorNursery: bind_addr=bind_addr, statespace=statespace, ) + self._cancel_after_result_on_exit.add(portal) await portal._submit_for_result( mod_path, fn.__name__, **kwargs ) - self._cancel_after_result_on_exit.add(portal) return portal async def wait(self): """Wait for all subactors to complete. """ - async def wait_for_proc(proc, actor, portal): + async def wait_for_proc(proc, actor, portal, cancel_scope): # TODO: timeout block here? if proc.is_alive(): await trio.hazmat.wait_readable(proc.sentinel) @@ -145,33 +145,45 @@ class ActorNursery: proc.join() log.debug(f"Joined {proc}") self._children.pop(actor.uid) - - async def wait_for_result(portal, actor): - # cancel the actor gracefully - log.info(f"Cancelling {portal.channel.uid} gracefully") - await portal.cancel_actor() - - log.debug(f"Waiting on final result from {subactor.uid}") - res = await portal.result() - # if it's an async-gen then we should alert the user - # that we're cancelling it - if inspect.isasyncgen(res): + # proc terminated, cancel result waiter + if cancel_scope: log.warn( - f"Blindly consuming asyncgen for {actor.uid}") - with trio.fail_after(1): - async with aclosing(res) as agen: - async for item in agen: - log.debug(f"Consuming item {item}") + f"Cancelling existing result waiter task for {actor.uid}") + cancel_scope.cancel() + + async def wait_for_result( + portal, actor, + task_status=trio.TASK_STATUS_IGNORED, + ): + # cancel the actor gracefully + with trio.open_cancel_scope() as cs: + task_status.started(cs) + log.info(f"Cancelling {portal.channel.uid} gracefully") + await portal.cancel_actor() + + log.debug(f"Waiting on final result from {subactor.uid}") + res = await portal.result() + # if it's an async-gen then we should alert the user + # that we're cancelling it + if inspect.isasyncgen(res): + log.warn( + f"Blindly consuming asyncgen for {actor.uid}") + with trio.fail_after(1): + async with aclosing(res) as agen: + async for item in agen: + log.debug(f"Consuming item {item}") + + if cs.cancelled_caught: + log.warn("Result waiter was cancelled") # unblocks when all waiter tasks have completed children = self._children.copy() async with trio.open_nursery() as nursery: for subactor, proc, portal in children.values(): - nursery.start_soon(wait_for_proc, proc, subactor, portal) - if proc.is_alive() and ( - portal in self._cancel_after_result_on_exit - ): - nursery.start_soon(wait_for_result, portal, subactor) + cs = None + if portal in self._cancel_after_result_on_exit: + cs = await nursery.start(wait_for_result, portal, subactor) + nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) async def cancel(self, hard_kill=False): """Cancel this nursery by instructing each subactor to cancel