From e80e0a551f3e76e42c7c83f172f5144621460003 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 Apr 2023 16:07:26 -0400 Subject: [PATCH] Change a bunch of log levels to cancel, including any `ContextCancelled` handling --- tractor/_entry.py | 2 +- tractor/_runtime.py | 51 +++++++++++++++++++++++++++++++++++---------- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/tractor/_entry.py b/tractor/_entry.py index 1e7997e..e8fb56d 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -132,7 +132,7 @@ def _trio_main( else: trio.run(trio_main) except KeyboardInterrupt: - log.warning(f"Actor {actor.uid} received KBI") + log.cancel(f"Actor {actor.uid} received KBI") finally: log.info(f"Actor {actor.uid} terminated") diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 06bc34f..97f3025 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -221,6 +221,8 @@ async def _invoke( assert cs if cs.cancelled_caught: + # if 'brokerd.kraken' in actor.uid: + # await _debug.breakpoint() # TODO: pack in ``trio.Cancelled.__traceback__`` here # so they can be unwrapped and displayed on the caller @@ -228,11 +230,11 @@ async def _invoke( fname = func.__name__ if ctx._cancel_called: - msg = f'`{fname}()` cancelled itself' + msg = f'`{fname}()`@{actor.uid} cancelled itself' elif cs.cancel_called: msg = ( - f'`{fname}()` was remotely cancelled by its caller ' + f'`{fname}()`@{actor.uid} was remotely cancelled by its caller ' f'{ctx.chan.uid}' ) @@ -910,9 +912,15 @@ class Actor: ''' cid = str(uuid.uuid4()) assert chan.uid - ctx = self.get_context(chan, cid, msg_buffer_size=msg_buffer_size) + ctx = self.get_context( + chan, + cid, + msg_buffer_size=msg_buffer_size, + ) log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") - await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) + await chan.send( + {'cmd': (ns, func, kwargs, self.uid, cid)} + ) # Wait on first response msg and validate; this should be # immediate. @@ -922,7 +930,11 @@ class Actor: if 'error' in first_msg: raise unpack_error(first_msg, chan) - elif functype not in ('asyncfunc', 'asyncgen', 'context'): + elif functype not in ( + 'asyncfunc', + 'asyncgen', + 'context', + ): raise ValueError(f"{first_msg} is an invalid response packet?") ctx._remote_func_type = functype @@ -1130,7 +1142,7 @@ class Actor: async def cancel_rpc_tasks( self, - only_chan: Optional[Channel] = None, + only_chan: Channel | None = None, ) -> None: ''' Cancel all existing RPC responder tasks using the cancel scope @@ -1331,13 +1343,15 @@ async def async_main( ) ) log.runtime("Waiting on service nursery to complete") - log.runtime("Service nursery complete") - log.runtime("Waiting on root nursery to complete") + log.runtime( + "Service nursery complete\n" + "Waiting on root nursery to complete" + ) # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) except Exception as err: - log.info("Closing all actor lifetime contexts") + log.runtime("Closing all actor lifetime contexts") actor.lifetime_stack.close() if not registered_with_arbiter: @@ -1358,7 +1372,14 @@ async def async_main( await try_ship_error_to_parent(actor._parent_chan, err) # always! - log.exception("Actor errored:") + match err: + case ContextCancelled(): + log.cancel( + f'Actor: {actor.uid} was task-context-cancelled with,\n' + f'str(err)' + ) + case _: + log.exception("Actor errored:") raise finally: @@ -1631,7 +1652,15 @@ async def process_messages( else: # ship any "internal" exception (i.e. one from internal # machinery not from an rpc task) to parent - log.exception("Actor errored:") + match err: + case ContextCancelled(): + log.cancel( + f'Actor: {actor.uid} was task-context-cancelled with,\n' + f'str(err)' + ) + case _: + log.exception("Actor errored:") + if actor._parent_chan: await try_ship_error_to_parent(actor._parent_chan, err)