diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 397f2c7..fd00ab4 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -96,6 +96,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: async def result_from_portal( + portal: Portal, actor: Actor, @@ -103,7 +104,7 @@ async def result_from_portal( cancel_on_result: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, -) -> None: +) -> tuple[Optional[Any], Optional[BaseException]]: """ Cancel actor gracefully once it's "main" portal's result arrives. @@ -111,9 +112,11 @@ async def result_from_portal( Should only be called for actors spawned with `run_in_actor()`. """ - __tracebackhide__ = True + # __tracebackhide__ = True uid = portal.channel.uid + remote_result = None + is_remote_result = None # cancel control is explicityl done by the caller with trio.CancelScope() as cs: @@ -129,45 +132,37 @@ async def result_from_portal( # XXX: streams should never be reaped here since they should # always be established and shutdown using a context manager api result = await portal.result() + is_remote_result = True log.info(f"Returning final result: {result}") + except RemoteActorError as rerr: + # this includes real remote errors as well as + # `ContextCancelled` + is_remote_result = True + result = rerr + except (Exception, trio.MultiError) as err: # we reraise in the parent task via a ``trio.MultiError`` - result = err - errors[actor.uid] = err - raise - - except trio.Cancelled as err: - # lol, of course we need this too ;P - # TODO: merge with above? - log.warning(f"Cancelled `Portal.result()` waiter for {uid}") + is_remote_result = False result = err # errors[actor.uid] = err # raise - if cancel_on_result: - if isinstance(result, Exception): - # errors[actor.uid] = result - log.warning( - f"Cancelling single-task-run {uid} after error {result}" - ) - # raise result + if cs.cancelled_caught: + log.warning(f"Cancelled `Portal.result()` waiter for {uid}") - else: - log.runtime( - f"Cancelling {uid} gracefully " - f"after one-time-task result {result}") + return result, is_remote_result - # an actor that was `.run_in_actor()` executes a single task - # and delivers the result, then we cancel it. - # TODO: likely in the future we should just implement this using - # the new `open_context()` IPC api, since it's the more general - # api and can represent this form. - # XXX: do we need this? - # await maybe_wait_for_debugger() - await portal.cancel_actor() + # except trio.Cancelled as err: + # # lol, of course we need this too ;P + # # TODO: merge with above? + # log.warning(f"Cancelled `Portal.result()` waiter for {uid}") + # result = err + # # errors[actor.uid] = err + # raise - return result + + # return result async def do_hard_kill( @@ -209,17 +204,17 @@ async def reap_proc( proc: trio.Process, uid: tuple[str, str], - terminate_after: float = float('inf'), + terminate_after: Optional[float] = None, hard_kill_after: int = 0.1, ) -> None: - with trio.move_on_after(terminate_after) as cs: + with trio.move_on_after(terminate_after or float('inf')) as cs: # Wait for proc termination but **dont' yet** do # any out-of-ipc-land termination / process # killing. This is a "light" (cancellable) join, # the hard join is below after timeout await proc.wait() - log.info(f'{uid} terminated gracefully') + log.info(f'Proc for {uid} terminated gracefully') if cs.cancelled_caught and terminate_after is not float('inf'): # Always "hard" join lingering sub procs since no @@ -248,6 +243,7 @@ async def new_proc( bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], _runtime_vars: Dict[str, Any], # serialized and sent to _child + *, graceful_kill_timeout: int = 3, @@ -357,6 +353,7 @@ async def new_proc( ) as cerr: log.exception(f'Relaying unexpected {cerr} to nursery') + await breakpoint() # sending IPC-msg level cancel requests is expected to be # managed by the nursery. @@ -373,161 +370,230 @@ async def new_proc( # True, # cancel_on_result ) - finally: - # 2 cases: - # - actor nursery was cancelled in which case - # we want to try a soft reap of the actor via - # ipc cancellation and then failing that do a hard - # reap. - # - this is normal termination and we must wait indefinitely - # for ria and daemon actors - reaping_cancelled: bool = False - ria = portal in actor_nursery._cancel_after_result_on_exit + # Graceful reap attempt - 2 cases: + # - actor nursery was cancelled in which case + # we want to try a soft reap of the actor via + # ipc cancellation and then failing that do a hard + # reap. + # - this is normal termination and we must wait indefinitely + # for ria to return and daemon actors to be cancelled + reaping_cancelled: bool = False + ria = portal in actor_nursery._cancel_after_result_on_exit + result = None - # this is the soft reap sequence. we can - # either collect results: - # - ria actors get them them via ``Portal.result()`` - # - we wait forever on daemon actors until they're - # cancelled by user code via ``Portal.cancel_actor()`` - # or ``ActorNursery.cancel(). in the latter case - # we have to expect another cancel here since - # the task spawning nurseries will both be cacelled - # by ``ActorNursery.cancel()``. + # this is the soft reap sequence. we can + # either collect results: + # - ria actors get them them via ``Portal.result()`` + # - we wait forever on daemon actors until they're + # cancelled by user code via ``Portal.cancel_actor()`` + # or ``ActorNursery.cancel(). in the latter case + # we have to expect another cancel here since + # the task spawning nurseries will both be cacelled + # by ``ActorNursery.cancel()``. - # OR, we're cancelled while collecting results, which - # case we need to try another soft cancel and reap attempt. - try: - log.cancel(f'Starting soft actor reap for {uid}') - cancel_scope = None - # async with trio.open_nursery() as nursery: + # OR, we're cancelled while collecting results, which + # case we need to try another soft cancel and reap attempt. + try: + log.cancel(f'Starting soft actor reap for {uid}') + cancel_scope = None + reap_timeout = None - if portal.channel.connected() and ria: + if portal.channel.connected() and ria: - # we wait for result and cancel on completion - # if uid[0] == 'odds': - # await breakpoint() - await result_from_portal( - portal, - subactor, - errors, - True, # cancel_on_result - ) - # # collect any expected ``.run_in_actor()`` results - # cancel_scope = await nursery.start( - # result_from_portal, - # portal, - # subactor, - # errors, - # True, # cancel_on_result - # ) + result, is_remote = await result_from_portal( + portal, + subactor, + errors, + # True, # cancel_on_result + ) + if is_remote: + if isinstance(result, RemoteActorError): + # errors[actor.uid] = result + if ( + portal.cancel_called and + isinstance(result, ContextCancelled) + ): + log.cancel(f'{uid} received expected cancel') + errors[uid] = result - # soft & cancellable - await reap_proc(proc, uid) + # fall through to below soft proc reap + reap_timeout = 0.5 - # # if proc terminates before portal result - # if cancel_scope: - # cancel_scope.cancel() - except ( - ContextCancelled, - ) as err: - if portal.cancel_called: - log.cancel('{uid} received expected cancel') + else: + log.warning( + f"Cancelling single-task-run {uid} after remote error {result}" + ) - # soft & cancellable - await reap_proc(proc, uid, terminate_after=0.1) + # likely a real remote error propagation + # so pass up to nursery strat + should_raise = await actor_nursery._handle_err( + result, + portal=portal, + ) - except ( - RemoteActorError, - ) as err: - reaping_cancelled = err - log.exception(f'{uid} remote error') - await actor_nursery._handle_err(err, portal=portal) + # propagate up to spawn nursery to be + # grouped into any multierror. + # if should_raise: + # raise result + + else: + log.runtime( + f"Cancelling {uid} gracefully " + f"after one-time-task result {result}") + + # an actor that was `.run_in_actor()` executes a single task + # and delivers the result, then we cancel it. + # TODO: likely in the future we should just implement this using + # the new `open_context()` IPC api, since it's the more general + # api and can represent this form. + # XXX: do we need this? + # await maybe_wait_for_debugger() + await portal.cancel_actor() - except ( - trio.Cancelled, - ) as err: - reaping_cancelled = err - if actor_nursery.cancelled: - log.cancel(f'{uid} wait cancelled by nursery') else: - log.exception(f'{uid} soft wait error?') + log.exception( + f"Cancelling single-task-run {uid} after local error" + ) + raise result - except ( - BaseException - ) as err: - reaping_cancelled = err - log.exception(f'{uid} soft reap local error') + # soft & cancellable + await reap_proc(proc, uid, terminate_after=reap_timeout) - finally: - if reaping_cancelled: - if actor_nursery.cancelled: - log.cancel(f'Nursery cancelled during soft wait for {uid}') + # except ( + # ContextCancelled, + # ) as err: + # if portal.cancel_called: + # log.cancel('{uid} received expected cancel') + + # # soft & cancellable + # await reap_proc(proc, uid, terminate_after=0.1) + + # except ( + # RemoteActorError, + # ) as err: + # reaping_cancelled = err + # log.exception(f'{uid} remote error') + # await actor_nursery._handle_err(err, portal=portal) + + except ( + trio.Cancelled, + ) as err: + + # NOTE: for now we pack the cancelleds and expect the actor + # nursery to re-raise them in a multierror but we could + # have also let them bubble up through the spawn nursery. + + # in theory it's more correct to raise any + # `ContextCancelled` errors we get back from the + # `Portal.cancel_actor()` call and in that error + # have meta-data about whether we timeout out or + # actually got a cancel message back from the remote task. + + # IF INSTEAD we raised *here* then this logic has to be + # handled inside the oca supervisor block and the spawn_n + # task cancelleds would have to be replaced with the remote + # task `ContextCancelled`s, *if* they ever arrive. + errors[uid] = err + # with trio.CancelScope(shield=True): + # await breakpoint() + + if actor_nursery.cancel_called: + log.cancel(f'{uid} soft reap cancelled by nursery') + else: + if not actor_nursery._spawn_n.cancel_scope.cancel_called: + # this would be pretty weird and unexpected + await breakpoint() + + # actor nursery wasn't cancelled before the spawn + # nursery was which likely means that there was + # an error in the actor nursery enter and the + # spawn nursery cancellation "beat" the call to + # .cancel()? that's a bug right? + + # saw this with settings bugs in the ordermode pane in + # piker. + log.exception(f'{uid} soft wait error?') + raise RuntimeError( + 'Task spawn nursery cancelled before actor nursery?') + + finally: + if reaping_cancelled: + assert actor_nursery.cancel_called + if actor_nursery.cancelled: + log.cancel(f'Nursery cancelled during soft wait for {uid}') + + with trio.CancelScope(shield=True): + await maybe_wait_for_debugger() + + # XXX: we should probably just + # check for a `ContextCancelled` on portals + # here and fill them in over `trio.Cancelled` right? + + # hard reap sequence with timeouts + if proc.poll() is None: + log.cancel(f'Attempting hard reap for {uid}') with trio.CancelScope(shield=True): - await maybe_wait_for_debugger() - # XXX: can't do this, it'll hang some tests.. no - # idea why yet. - # with trio.CancelScope(shield=True): - # await actor_nursery._handle_err( - # reaping_cancelled, - # portal=portal - # ) + # hard reap sequence + # ``Portal.cancel_actor()`` is expected to have + # been called by the supervising nursery so we + # do **not** call it here. - # hard reap sequence with timeouts - if proc.poll() is None: - log.cancel(f'Attempting hard reap for {uid}') - - with trio.CancelScope(shield=True): - - # hard reap sequence - # ``Portal.cancel_actor()`` is expected to have - # been called by the supervising nursery so we - # do **not** call it here. - - await reap_proc( - proc, - uid, - # this is the same as previous timeout - # setting before rewriting this spawn - # section - terminate_after=3, - ) - - - # if somehow the hard reap didn't collect the child then - # we send in the big gunz. - while proc.poll() is None: - log.critical( - f'ZOMBIE LORD HAS ARRIVED for your {uid}:\n' - f'{proc}' + await reap_proc( + proc, + uid, + # this is the same as previous timeout + # setting before rewriting this spawn + # section + terminate_after=3, ) - with trio.CancelScope(shield=True): - await reap_proc( - proc, - uid, - terminate_after=0.1, - ) - log.info(f"Joined {proc}") - # 2 cases: - # - the actor terminated gracefully - # - we're cancelled and likely need to re-raise + # if somehow the hard reap didn't collect the child then + # we send in the big gunz. + while proc.poll() is None: + log.critical( + f'ZOMBIE LORD HAS ARRIVED for your {uid}:\n' + f'{proc}' + ) + with trio.CancelScope(shield=True): + await reap_proc( + proc, + uid, + terminate_after=0.1, + ) - # pop child entry to indicate we no longer managing this - # subactor - subactor, proc, portal = actor_nursery._children.pop( - subactor.uid) - if not actor_nursery._children: - log.cancel(f"{uid} reports all children complete!") - actor_nursery._all_children_reaped.set() + log.info(f"Joined {proc}") - # not entirely sure why we need this.. but without it - # the reaping cancelled error is never reported upwards - # to the spawn nursery? - if reaping_cancelled: - raise reaping_cancelled + # 2 cases: + # - the actor terminated gracefully + # - we're cancelled and likely need to re-raise + + # pop child entry to indicate we no longer managing this + # subactor + subactor, proc, portal = actor_nursery._children.pop( + subactor.uid) + + if not actor_nursery._children: + # all subactor children have completed + log.cancel(f"{uid} reports all children complete!") + + actor_nursery._all_children_reaped.set() + + spawn_n = actor_nursery._spawn_n + # with trio.CancelScope(shield=True): + # await breakpoint() + if not spawn_n._closed: + # the parent task that opened the actor nursery + # hasn't yet closed it so we cancel that task now. + spawn_n.cancel_scope.cancel() + + # not entirely sure why we need this.. but without it + # the reaping cancelled error is never reported upwards + # to the spawn nursery? + # if reaping_cancelled: + # raise reaping_cancelled else: # `multiprocessing` @@ -644,7 +710,8 @@ async def mp_new_proc( # no shield is required here (vs. above on the trio backend) # since debug mode is not supported on mp. - await actor_nursery._join_procs.wait() + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() finally: # XXX: in the case we were cancelled before the sub-proc @@ -659,13 +726,23 @@ async def mp_new_proc( try: # async with trio.open_nursery() as n: # n.cancel_scope.shield = True - cancel_scope = await nursery.start( - result_from_portal, + print('soft mp reap') + # cancel_scope = await nursery.start( + result = await result_from_portal( portal, subactor, - errors + errors, + # True, ) - except trio.Cancelled as err: + + # except trio.Cancelled as err: + except BaseException as err: + + log.exception('hard mp reap') + with trio.CancelScope(shield=True): + await actor_nursery._handle_err(err, portal=portal) + print('sent to nursery') + cancel_exc = err # if the reaping task was cancelled we may have hit @@ -675,31 +752,43 @@ async def mp_new_proc( reaping_cancelled = True if proc.is_alive(): - with trio.move_on_after(0.1) as cs: - cs.shield = True - await proc_waiter(proc) + with trio.CancelScope(shield=True): + print('hard reaping') + with trio.move_on_after(0.1) as cs: + cs.shield = True + await proc_waiter(proc) if cs.cancelled_caught: + print('pwning mp proc') proc.terminate() + finally: - if not reaping_cancelled and proc.is_alive(): - await proc_waiter(proc) + # if not reaping_cancelled and proc.is_alive(): + # await proc_waiter(proc) - # TODO: timeout block here? - proc.join() + # TODO: timeout block here? + proc.join() - log.debug(f"Joined {proc}") - # pop child entry to indicate we are no longer managing subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + log.debug(f"Joined {proc}") - # cancel result waiter that may have been spawned in - # tandem if not done already - if cancel_scope: - log.warning( - "Cancelling existing result waiter task for " - f"{subactor.uid}") - cancel_scope.cancel() + # pop child entry to indicate we are no longer managing subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid) - elif reaping_cancelled: # let the cancellation bubble up - assert cancel_exc - raise cancel_exc + if not actor_nursery._children: + # all subactor children have completed + # log.cancel(f"{uid} reports all children complete!") + actor_nursery._all_children_reaped.set() + + + # cancel result waiter that may have been spawned in + # tandem if not done already + if cancel_scope: + log.warning( + "Cancelling existing result waiter task for " + f"{subactor.uid}") + cancel_scope.cancel() + + if reaping_cancelled: # let the cancellation bubble up + print('raising') + assert cancel_exc + raise cancel_exc diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 1d6b85d..729e5f2 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -12,7 +12,7 @@ import trio from async_generator import asynccontextmanager from . import _debug -from ._debug import maybe_wait_for_debugger +from ._debug import maybe_wait_for_debugger, breakpoint from ._state import current_actor, is_main_process, is_root_process from .log import get_logger, get_loglevel from ._actor import Actor @@ -48,10 +48,19 @@ class ActorNursery: # cancelled when their "main" result arrives self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False + self._cancel_called: bool = False self._join_procs = trio.Event() self._all_children_reaped = trio.Event() self.errors = errors + @property + def cancel_called(self) -> bool: + ''' + Same principle as ``trio.CancelScope.cancel_called``. + + ''' + return self._cancel_called + async def start_actor( self, name: str, @@ -177,17 +186,22 @@ class ActorNursery: If ``hard_killl`` is set to ``True`` then kill the processes directly without any far end graceful ``trio`` cancellation. - """ - self.cancelled = True + """ # entries may be poppsed by the spawning backend as # actors cancel individually childs = self._children.copy() + if self.cancel_called: + log.warning( + f'Nursery with children {len(childs)} already cancelled') + return + log.cancel( f'Cancelling nursery in {self._actor.uid} with children\n' f'{childs.keys()}' ) + self._cancel_called = True # wake up all spawn tasks to move on as those nursery # has ``__aexit__()``-ed @@ -196,44 +210,69 @@ class ActorNursery: await maybe_wait_for_debugger() # one-cancels-all strat - async with trio.open_nursery() as cancel_sender: - for subactor, proc, portal in childs.values(): - if proc.poll() is None and not portal.cancel_called: - cancel_sender.start_soon(portal.cancel_actor) + try: + async with trio.open_nursery() as cancel_sender: + for subactor, proc, portal in childs.values(): + if not portal.cancel_called and portal.channel.connected(): + cancel_sender.start_soon(portal.cancel_actor) + + except trio.MultiError as err: + _err = err + log.exception(f'{self} errors during cancel') + # await breakpoint() + # # LOL, ok so multiprocessing requires this for some reason.. + # with trio.CancelScope(shield=True): + # await trio.lowlevel.checkpoint() # cancel all spawner tasks # self._spawn_n.cancel_scope.cancel() + self.cancelled = True async def _handle_err( self, err: BaseException, portal: Optional[Portal] = None, + is_ctx_error: bool = False, - ) -> None: + ) -> bool: # XXX: hypothetically an error could be # raised and then a cancel signal shows up # slightly after in which case the `else:` # block here might not complete? For now, # shield both. - with trio.CancelScope(shield=True): - etype = type(err) + if is_ctx_error: + assert not portal + uid = self._actor.uid + else: + uid = portal.channel.uid - if etype in ( - trio.Cancelled, - KeyboardInterrupt - ) or ( - is_multi_cancelled(err) - ): - log.cancel( - f"Nursery for {current_actor().uid} " - f"was cancelled with {etype}") - else: - log.exception( - f"Nursery for {current_actor().uid} " - f"errored with {err}, ") + if err not in self.errors.values(): + self.errors[uid] = err - # cancel all subactors - await self.cancel() + with trio.CancelScope(shield=True): + etype = type(err) + + if etype in ( + trio.Cancelled, + KeyboardInterrupt + ) or ( + is_multi_cancelled(err) + ): + log.cancel( + f"Nursery for {current_actor().uid} " + f"was cancelled with {etype}") + else: + log.error( + f"Nursery for {current_actor().uid} " + f"errored from {uid} with\n{err}") + + # cancel all subactors + await self.cancel() + + return True + + log.warning(f'Skipping duplicate error for {uid}') + return False @asynccontextmanager @@ -251,6 +290,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # actors spawned in "daemon mode" (aka started using # ``ActorNursery.start_actor()``). src_err: Optional[BaseException] = None + nurse_err: Optional[BaseException] = None # errors from this daemon actor nursery bubble up to caller try: @@ -303,9 +343,16 @@ async def _open_and_supervise_one_cancels_all_nursery( src_err = err # with trio.CancelScope(shield=True): - await anursery._handle_err(err) - raise + should_raise = await anursery._handle_err(err, is_ctx_error=True) + # XXX: raising here causes some cancellation + # / multierror tests to fail because of what appears to + # be double raise? we probably need to see how `trio` + # does this case.. + if should_raise: + raise + + # except trio.MultiError as err: except BaseException as err: # nursery bubble up nurse_err = err @@ -331,15 +378,15 @@ async def _open_and_supervise_one_cancels_all_nursery( # collected in ``errors`` so cancel all actors, summarize # all errors and re-raise. - if src_err and src_err not in errors.values(): - errors[actor.uid] = src_err - + # await breakpoint() if errors: + # if nurse_err or src_err: if anursery._children: raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!") # with trio.CancelScope(shield=True): # await anursery.cancel() + # use `MultiError` as needed if len(errors) > 1: raise trio.MultiError(tuple(errors.values()))