diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 2936220..141d7c8 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -35,7 +35,7 @@ from exceptiongroup import BaseExceptionGroup import trio from trio_typing import TaskStatus -from .devx._debug import ( +from .devx import ( maybe_wait_for_debugger, acquire_debug_lock, ) @@ -144,7 +144,7 @@ async def exhaust_portal( # XXX: streams should never be reaped here since they should # always be established and shutdown using a context manager api - final = await portal.result() + final: Any = await portal.result() except ( Exception, @@ -152,13 +152,23 @@ async def exhaust_portal( ) as err: # we reraise in the parent task via a ``BaseExceptionGroup`` return err + except trio.Cancelled as err: # lol, of course we need this too ;P # TODO: merge with above? - log.warning(f"Cancelled result waiter for {portal.actor.uid}") + log.warning( + 'Cancelled portal result waiter task:\n' + f'uid: {portal.channel.uid}\n' + f'error: {err}\n' + ) return err + else: - log.debug(f"Returning final result: {final}") + log.debug( + f'Returning final result from portal:\n' + f'uid: {portal.channel.uid}\n' + f'result: {final}\n' + ) return final @@ -170,26 +180,34 @@ async def cancel_on_completion( ) -> None: ''' - Cancel actor gracefully once it's "main" portal's + Cancel actor gracefully once its "main" portal's result arrives. - Should only be called for actors spawned with `run_in_actor()`. + Should only be called for actors spawned via the + `Portal.run_in_actor()` API. + + => and really this API will be deprecated and should be + re-implemented as a `.hilevel.one_shot_task_nursery()`..) ''' # if this call errors we store the exception for later # in ``errors`` which will be reraised inside # an exception group and we still send out a cancel request - result = await exhaust_portal(portal, actor) + result: Any|Exception = await exhaust_portal(portal, actor) if isinstance(result, Exception): - errors[actor.uid] = result + errors[actor.uid]: Exception = result log.warning( - f"Cancelling {portal.channel.uid} after error {result}" + 'Cancelling subactor due to error:\n' + f'uid: {portal.channel.uid}\n' + f'error: {result}\n' ) else: log.runtime( - f"Cancelling {portal.channel.uid} gracefully " - f"after result {result}") + 'Cancelling subactor gracefully:\n' + f'uid: {portal.channel.uid}\n' + f'result: {result}\n' + ) # cancel the process now that we have a final result await portal.cancel_actor() @@ -219,11 +237,14 @@ async def do_hard_kill( to be handled. ''' + log.cancel( + 'Terminating sub-proc:\n' + f'|_{proc}\n' + ) # NOTE: this timeout used to do nothing since we were shielding # the ``.wait()`` inside ``new_proc()`` which will pretty much # never release until the process exits, now it acts as # a hard-kill time ultimatum. - log.debug(f"Terminating {proc}") with trio.move_on_after(terminate_after) as cs: # NOTE: code below was copied verbatim from the now deprecated @@ -260,7 +281,10 @@ async def do_hard_kill( # zombies (as a feature) we ask the OS to do send in the # removal swad as the last resort. if cs.cancelled_caught: - log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}") + log.critical( + 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n' + f'|_{proc}\n' + ) proc.kill() @@ -281,10 +305,16 @@ async def soft_wait( join/reap on an actor-runtime-in-process. ''' - uid = portal.channel.uid + uid: tuple[str, str] = portal.channel.uid try: - log.cancel(f'Soft waiting on actor:\n{uid}') + log.cancel( + 'Soft waiting on sub-actor proc:\n' + f'uid: {uid}\n' + f'|_{proc}\n' + ) + # wait on sub-proc to signal termination await wait_func(proc) + except trio.Cancelled: # if cancelled during a soft wait, cancel the child # actor before entering the hard reap sequence @@ -296,8 +326,8 @@ async def soft_wait( async def cancel_on_proc_deth(): ''' - Cancel the actor cancel request if we detect that - that the process terminated. + "Cancel the (actor) cancel" request if we detect + that that the underlying sub-process terminated. ''' await wait_func(proc) @@ -314,10 +344,10 @@ async def soft_wait( if proc.poll() is None: # type: ignore log.warning( - 'Actor still alive after cancel request:\n' - f'{uid}' + 'Subactor still alive after cancel request?\n\n' + f'uid: {uid}\n' + f'|_{proc}\n' ) - n.cancel_scope.cancel() raise @@ -341,7 +371,7 @@ async def new_proc( ) -> None: # lookup backend spawning target - target = _methods[_spawn_method] + target: Callable = _methods[_spawn_method] # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method @@ -491,8 +521,9 @@ async def trio_proc( # cancel result waiter that may have been spawned in # tandem if not done already log.cancel( - "Cancelling existing result waiter task for " - f"{subactor.uid}") + 'Cancelling existing result waiter task for ' + f'{subactor.uid}' + ) nursery.cancel_scope.cancel() finally: @@ -510,18 +541,35 @@ async def trio_proc( with trio.move_on_after(0.5): await proc.wait() - if is_root_process(): - # TODO: solve the following issue where we need - # to do a similar wait like this but in an - # "intermediary" parent actor that itself isn't - # in debug but has a child that is, and we need - # to hold off on relaying SIGINT until that child - # is complete. - # https://github.com/goodboy/tractor/issues/320 - await maybe_wait_for_debugger( - child_in_debug=_runtime_vars.get( - '_debug_mode', False), - ) + log.pdb( + 'Delaying subproc reaper while debugger locked..' + ) + await maybe_wait_for_debugger( + child_in_debug=_runtime_vars.get( + '_debug_mode', False + ), + # TODO: need a diff value then default? + # poll_steps=9999999, + ) + # TODO: solve the following issue where we need + # to do a similar wait like this but in an + # "intermediary" parent actor that itself isn't + # in debug but has a child that is, and we need + # to hold off on relaying SIGINT until that child + # is complete. + # https://github.com/goodboy/tractor/issues/320 + # -[ ] we need to handle non-root parent-actors specially + # by somehow determining if a child is in debug and then + # avoiding cancel/kill of said child by this + # (intermediary) parent until such a time as the root says + # the pdb lock is released and we are good to tear down + # (our children).. + # + # -[ ] so maybe something like this where we try to + # acquire the lock and get notified of who has it, + # check that uid against our known children? + # this_uid: tuple[str, str] = current_actor().uid + # await acquire_debug_lock(this_uid) if proc.poll() is None: log.cancel(f"Attempting to hard kill {proc}")