diff --git a/tractor/_runtime.py b/tractor/_runtime.py index c41f6f5..d127d9d 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -312,14 +312,19 @@ async def _invoke( # don't pop the local context until we know the # associated child isn't in debug any more await maybe_wait_for_debugger() - ctx: Context = actor._contexts.pop((chan.uid, cid)) - res_msg: str = ( - 'IPC context terminated with result:\n' - f'result={ctx._result}\n' - f'error={ctx._local_error}\n' - f'|_{pformat(ctx)}\n\n' + ctx: Context = actor._contexts.pop( + (chan.uid, cid) + ) + + res_str: str = ( + 'error: {ctx._local_error}' + if ctx._local_error + else f'result: {ctx._result}' + ) + log.cancel( + f'IPC context terminated with final {res_str}\n' + f'|_{pformat(ctx)}\n' ) - log.cancel(res_msg) if ctx.cancelled_caught: @@ -1537,8 +1542,20 @@ class Actor: # be cancelled was indeed spawned by a request from this channel ctx, func, is_complete = self._rpc_tasks[(chan, cid)] scope: CancelScope = ctx._scope + except KeyError: - log.cancel(f"{cid} has already completed/terminated?") + # NOTE: during msging race conditions this will often + # emit, some examples: + # - callee returns a result before cancel-msg/ctxc-raised + # - callee self raises ctxc before caller send request, + # - callee errors prior to cancel req. + log.cancel( + 'Cancel request invalid, RPC task already completed?\n' + f'<= canceller: {requesting_uid}\n' + f' |_{chan}\n\n' + + f'=> ctx id: {cid}\n' + ) return True log.cancel( @@ -2017,8 +2034,10 @@ async def process_messages( log.transport( # type: ignore f'<= IPC msg from peer: {chan.uid}\n\n' + # TODO: conditionally avoid fmting depending # on log level (for perf)? + # => specifically `pformat()` sub-call..? f'{pformat(msg)}\n' ) @@ -2036,14 +2055,25 @@ async def process_messages( 'Waiting on next IPC msg from\n' f'peer: {chan.uid}:\n' f'|_{chan}\n' + # f'last msg: {msg}\n' ) continue - # TODO: implement with ``match:`` syntax? - # process command request + # process a 'cmd' request-msg upack + # TODO: impl with native `msgspec.Struct` support !! + # -[ ] implement with ``match:`` syntax? + # -[ ] discard un-authed msgs as per, + # try: - ns, funcname, kwargs, actorid, cid = msg['cmd'] + ( + ns, + funcname, + kwargs, + actorid, + cid, + ) = msg['cmd'] + except KeyError: # This is the non-rpc error case, that is, an # error **not** raised inside a call to ``_invoke()`` @@ -2062,25 +2092,27 @@ async def process_messages( f'=> {ns}.{funcname}({kwargs})\n' ) if ns == 'self': + uid: tuple = chan.uid if funcname == 'cancel': func: Callable = actor.cancel - kwargs['requesting_uid'] = chan.uid + kwargs['requesting_uid'] = uid # don't start entire actor runtime cancellation # if this actor is currently in debug mode! - pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete + pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete if pdb_complete: await pdb_complete.wait() - # we immediately start the runtime machinery - # shutdown + # Either of `Actor.cancel()`/`.cancel_soon()` + # was called, so terminate this IPC msg + # loop, exit back out into `async_main()`, + # and immediately start the core runtime + # machinery shutdown! with CancelScope(shield=True): - # actor.cancel() was called so kill this - # msg loop and break out into - # ``async_main()`` log.cancel( - "Actor runtime for was remotely cancelled " - f"by {chan.uid}" + f'Cancel request for `Actor` runtime\n' + f'<= canceller: {uid}\n' + # f'=> uid: {actor.uid}\n' ) await _invoke( actor, @@ -2107,9 +2139,10 @@ async def process_messages( target_cid = kwargs['cid'] kwargs['requesting_uid'] = chan.uid log.cancel( - f'Remote request to cancel task\n' - f'remote actor: {chan.uid}\n' - f'task: {target_cid}' + f'Rx task cancel request\n' + f'<= canceller: {chan.uid}\n' + f'=> uid: {actor.uid}\n' + f' |_cid: {target_cid}\n' ) try: await _invoke(