From e403d63eb732d748f080bdca79a333d2be554b2a Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Wed, 21 Feb 2024 13:05:22 -0500
Subject: [PATCH] Better logging for cancel requests in IPC msg loop

As similarly improved in other parts of the runtime, adds much more
pedantic (`.cancel()`) logging content to indicate the src of remote
cancellation request particularly for `Actor.cancel()` and
`._cancel_task()` cases prior to `._invoke()` task scheduling. Also add
detailed case comments and much more info to the
"request-to-cancel-already-terminated-RPC-task" log emission to include
the `Channel` and `Context.cid` deats.

This helped me find the src of a race condition causing a test to fail
where a callee ctx task was returning a result *before* an expected
`ctx.cancel()` request arrived B). Adding much more pedantic
`.cancel()` msg contents around the requester's deats should ensure
these cases are much easier to detect going forward!

Also, simplify the `._invoke()` final result/error log msg to only put
*one of either* the final error or returned result above the `Context`
pprint.
---
 tractor/_runtime.py | 79 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 56 insertions(+), 23 deletions(-)

diff --git a/tractor/_runtime.py b/tractor/_runtime.py
index fe171827..a9bbc0d6 100644
--- a/tractor/_runtime.py
+++ b/tractor/_runtime.py
@@ -305,14 +305,19 @@ async def _invoke(
                 # don't pop the local context until we know the
                 # associated child isn't in debug any more
                 await _debug.maybe_wait_for_debugger()
-                ctx: Context = actor._contexts.pop((chan.uid, cid))
-                res_msg: str = (
-                    'IPC context terminated with result:\n'
-                    f'result={ctx._result}\n'
-                    f'error={ctx._local_error}\n'
-                    f'|_{pformat(ctx)}\n\n'
+                ctx: Context = actor._contexts.pop(
+                    (chan.uid, cid)
+                )
+
+                res_str: str = (
+                    'error: {ctx._local_error}'
+                    if ctx._local_error
+                    else f'result: {ctx._result}'
+                )
+                log.cancel(
+                    f'IPC context terminated with final {res_str}\n'
+                    f'|_{pformat(ctx)}\n'
                 )
-                log.cancel(res_msg)
 
             if ctx.cancelled_caught:
 
@@ -1453,8 +1458,20 @@ class Actor:
             # be cancelled was indeed spawned by a request from this channel
             ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
             scope: CancelScope = ctx._scope
+
         except KeyError:
-            log.cancel(f"{cid} has already completed/terminated?")
+            # NOTE: during msging race conditions this will often
+            # emit, some examples:
+            # - callee returns a result before cancel-msg/ctxc-raised
+            # - callee self raises ctxc before caller send request,
+            # - callee errors prior to cancel req.
+            log.cancel(
+                'Cancel request invalid, RPC task already completed?\n'
+                f'<= canceller: {requesting_uid}\n'
+                f'  |_{chan}\n\n'
+
+                f'=> ctx id: {cid}\n'
+            )
             return True
 
         log.cancel(
@@ -1868,8 +1885,10 @@ async def process_messages(
 
                 log.transport(   # type: ignore
                     f'<= IPC msg from peer: {chan.uid}\n\n'
+
                     # TODO: conditionally avoid fmting depending
                     # on log level (for perf)?
+                    # => specifically `pformat()` sub-call..?
                     f'{pformat(msg)}\n'
                 )
 
@@ -1887,14 +1906,25 @@ async def process_messages(
                         'Waiting on next IPC msg from\n'
                         f'peer: {chan.uid}:\n'
                         f'|_{chan}\n'
+
                         # f'last msg: {msg}\n'
                     )
                     continue
 
-                # TODO: implement with ``match:`` syntax?
-                # process command request
+                # process a 'cmd' request-msg upack
+                # TODO: impl with native `msgspec.Struct` support !!
+                # -[ ] implement with ``match:`` syntax?
+                # -[ ] discard un-authed msgs as per,
+                # <TODO put issue for typed msging structs>
                 try:
-                    ns, funcname, kwargs, actorid, cid = msg['cmd']
+                    (
+                        ns,
+                        funcname,
+                        kwargs,
+                        actorid,
+                        cid,
+                    ) = msg['cmd']
+
                 except KeyError:
                     # This is the non-rpc error case, that is, an
                     # error **not** raised inside a call to ``_invoke()``
@@ -1913,25 +1943,27 @@ async def process_messages(
                     f'=> {ns}.{funcname}({kwargs})\n'
                 )
                 if ns == 'self':
+                    uid: tuple = chan.uid
                     if funcname == 'cancel':
                         func: Callable = actor.cancel
-                        kwargs['requesting_uid'] = chan.uid
+                        kwargs['requesting_uid'] = uid
 
                         # don't start entire actor runtime cancellation
                         # if this actor is currently in debug mode!
-                        pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
+                        pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
                         if pdb_complete:
                             await pdb_complete.wait()
 
-                        # we immediately start the runtime machinery
-                        # shutdown
+                        # Either of  `Actor.cancel()`/`.cancel_soon()`
+                        # was called, so terminate this IPC msg
+                        # loop, exit back out into `async_main()`,
+                        # and immediately start the core runtime
+                        # machinery shutdown!
                         with CancelScope(shield=True):
-                            # actor.cancel() was called so kill this
-                            # msg loop and break out into
-                            # ``async_main()``
                             log.cancel(
-                                "Actor runtime for was remotely cancelled "
-                                f"by {chan.uid}"
+                                f'Cancel request for `Actor` runtime\n'
+                                f'<= canceller: {uid}\n'
+                                # f'=> uid: {actor.uid}\n'
                             )
                             await _invoke(
                                 actor,
@@ -1958,9 +1990,10 @@ async def process_messages(
                         target_cid = kwargs['cid']
                         kwargs['requesting_uid'] = chan.uid
                         log.cancel(
-                            f'Remote request to cancel task\n'
-                            f'remote actor: {chan.uid}\n'
-                            f'task: {target_cid}'
+                            f'Rx task cancel request\n'
+                            f'<= canceller: {chan.uid}\n'
+                            f'=> uid: {actor.uid}\n'
+                            f'  |_cid: {target_cid}\n'
                         )
                         try:
                             await _invoke(