Better logging for cancel requests in IPC msg loop
As similarly improved in other parts of the runtime, adds much more pedantic (`.cancel()`) logging content to indicate the src of remote cancellation request particularly for `Actor.cancel()` and `._cancel_task()` cases prior to `._invoke()` task scheduling. Also add detailed case comments and much more info to the "request-to-cancel-already-terminated-RPC-task" log emission to include the `Channel` and `Context.cid` deats. This helped me find the src of a race condition causing a test to fail where a callee ctx task was returning a result *before* an expected `ctx.cancel()` request arrived B). Adding much more pedantic `.cancel()` msg contents around the requester's deats should ensure these cases are much easier to detect going forward! Also, simplify the `._invoke()` final result/error log msg to only put *one of either* the final error or returned result above the `Context` pprint.pre_pretty_struct_dep_commit_b54cb66
							parent
							
								
									3c385c6949
								
							
						
					
					
						commit
						e403d63eb7
					
				|  | @ -305,14 +305,19 @@ async def _invoke( | ||||||
|                 # don't pop the local context until we know the |                 # don't pop the local context until we know the | ||||||
|                 # associated child isn't in debug any more |                 # associated child isn't in debug any more | ||||||
|                 await _debug.maybe_wait_for_debugger() |                 await _debug.maybe_wait_for_debugger() | ||||||
|                 ctx: Context = actor._contexts.pop((chan.uid, cid)) |                 ctx: Context = actor._contexts.pop( | ||||||
|                 res_msg: str = ( |                     (chan.uid, cid) | ||||||
|                     'IPC context terminated with result:\n' |                 ) | ||||||
|                     f'result={ctx._result}\n' | 
 | ||||||
|                     f'error={ctx._local_error}\n' |                 res_str: str = ( | ||||||
|                     f'|_{pformat(ctx)}\n\n' |                     'error: {ctx._local_error}' | ||||||
|  |                     if ctx._local_error | ||||||
|  |                     else f'result: {ctx._result}' | ||||||
|  |                 ) | ||||||
|  |                 log.cancel( | ||||||
|  |                     f'IPC context terminated with final {res_str}\n' | ||||||
|  |                     f'|_{pformat(ctx)}\n' | ||||||
|                 ) |                 ) | ||||||
|                 log.cancel(res_msg) |  | ||||||
| 
 | 
 | ||||||
|             if ctx.cancelled_caught: |             if ctx.cancelled_caught: | ||||||
| 
 | 
 | ||||||
|  | @ -1453,8 +1458,20 @@ class Actor: | ||||||
|             # be cancelled was indeed spawned by a request from this channel |             # be cancelled was indeed spawned by a request from this channel | ||||||
|             ctx, func, is_complete = self._rpc_tasks[(chan, cid)] |             ctx, func, is_complete = self._rpc_tasks[(chan, cid)] | ||||||
|             scope: CancelScope = ctx._scope |             scope: CancelScope = ctx._scope | ||||||
|  | 
 | ||||||
|         except KeyError: |         except KeyError: | ||||||
|             log.cancel(f"{cid} has already completed/terminated?") |             # NOTE: during msging race conditions this will often | ||||||
|  |             # emit, some examples: | ||||||
|  |             # - callee returns a result before cancel-msg/ctxc-raised | ||||||
|  |             # - callee self raises ctxc before caller send request, | ||||||
|  |             # - callee errors prior to cancel req. | ||||||
|  |             log.cancel( | ||||||
|  |                 'Cancel request invalid, RPC task already completed?\n' | ||||||
|  |                 f'<= canceller: {requesting_uid}\n' | ||||||
|  |                 f'  |_{chan}\n\n' | ||||||
|  | 
 | ||||||
|  |                 f'=> ctx id: {cid}\n' | ||||||
|  |             ) | ||||||
|             return True |             return True | ||||||
| 
 | 
 | ||||||
|         log.cancel( |         log.cancel( | ||||||
|  | @ -1868,8 +1885,10 @@ async def process_messages( | ||||||
| 
 | 
 | ||||||
|                 log.transport(   # type: ignore |                 log.transport(   # type: ignore | ||||||
|                     f'<= IPC msg from peer: {chan.uid}\n\n' |                     f'<= IPC msg from peer: {chan.uid}\n\n' | ||||||
|  | 
 | ||||||
|                     # TODO: conditionally avoid fmting depending |                     # TODO: conditionally avoid fmting depending | ||||||
|                     # on log level (for perf)? |                     # on log level (for perf)? | ||||||
|  |                     # => specifically `pformat()` sub-call..? | ||||||
|                     f'{pformat(msg)}\n' |                     f'{pformat(msg)}\n' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|  | @ -1887,14 +1906,25 @@ async def process_messages( | ||||||
|                         'Waiting on next IPC msg from\n' |                         'Waiting on next IPC msg from\n' | ||||||
|                         f'peer: {chan.uid}:\n' |                         f'peer: {chan.uid}:\n' | ||||||
|                         f'|_{chan}\n' |                         f'|_{chan}\n' | ||||||
|  | 
 | ||||||
|                         # f'last msg: {msg}\n' |                         # f'last msg: {msg}\n' | ||||||
|                     ) |                     ) | ||||||
|                     continue |                     continue | ||||||
| 
 | 
 | ||||||
|                 # TODO: implement with ``match:`` syntax? |                 # process a 'cmd' request-msg upack | ||||||
|                 # process command request |                 # TODO: impl with native `msgspec.Struct` support !! | ||||||
|  |                 # -[ ] implement with ``match:`` syntax? | ||||||
|  |                 # -[ ] discard un-authed msgs as per, | ||||||
|  |                 # <TODO put issue for typed msging structs> | ||||||
|                 try: |                 try: | ||||||
|                     ns, funcname, kwargs, actorid, cid = msg['cmd'] |                     ( | ||||||
|  |                         ns, | ||||||
|  |                         funcname, | ||||||
|  |                         kwargs, | ||||||
|  |                         actorid, | ||||||
|  |                         cid, | ||||||
|  |                     ) = msg['cmd'] | ||||||
|  | 
 | ||||||
|                 except KeyError: |                 except KeyError: | ||||||
|                     # This is the non-rpc error case, that is, an |                     # This is the non-rpc error case, that is, an | ||||||
|                     # error **not** raised inside a call to ``_invoke()`` |                     # error **not** raised inside a call to ``_invoke()`` | ||||||
|  | @ -1913,25 +1943,27 @@ async def process_messages( | ||||||
|                     f'=> {ns}.{funcname}({kwargs})\n' |                     f'=> {ns}.{funcname}({kwargs})\n' | ||||||
|                 ) |                 ) | ||||||
|                 if ns == 'self': |                 if ns == 'self': | ||||||
|  |                     uid: tuple = chan.uid | ||||||
|                     if funcname == 'cancel': |                     if funcname == 'cancel': | ||||||
|                         func: Callable = actor.cancel |                         func: Callable = actor.cancel | ||||||
|                         kwargs['requesting_uid'] = chan.uid |                         kwargs['requesting_uid'] = uid | ||||||
| 
 | 
 | ||||||
|                         # don't start entire actor runtime cancellation |                         # don't start entire actor runtime cancellation | ||||||
|                         # if this actor is currently in debug mode! |                         # if this actor is currently in debug mode! | ||||||
|                         pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete |                         pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete | ||||||
|                         if pdb_complete: |                         if pdb_complete: | ||||||
|                             await pdb_complete.wait() |                             await pdb_complete.wait() | ||||||
| 
 | 
 | ||||||
|                         # we immediately start the runtime machinery |                         # Either of  `Actor.cancel()`/`.cancel_soon()` | ||||||
|                         # shutdown |                         # was called, so terminate this IPC msg | ||||||
|  |                         # loop, exit back out into `async_main()`, | ||||||
|  |                         # and immediately start the core runtime | ||||||
|  |                         # machinery shutdown! | ||||||
|                         with CancelScope(shield=True): |                         with CancelScope(shield=True): | ||||||
|                             # actor.cancel() was called so kill this |  | ||||||
|                             # msg loop and break out into |  | ||||||
|                             # ``async_main()`` |  | ||||||
|                             log.cancel( |                             log.cancel( | ||||||
|                                 "Actor runtime for was remotely cancelled " |                                 f'Cancel request for `Actor` runtime\n' | ||||||
|                                 f"by {chan.uid}" |                                 f'<= canceller: {uid}\n' | ||||||
|  |                                 # f'=> uid: {actor.uid}\n' | ||||||
|                             ) |                             ) | ||||||
|                             await _invoke( |                             await _invoke( | ||||||
|                                 actor, |                                 actor, | ||||||
|  | @ -1958,9 +1990,10 @@ async def process_messages( | ||||||
|                         target_cid = kwargs['cid'] |                         target_cid = kwargs['cid'] | ||||||
|                         kwargs['requesting_uid'] = chan.uid |                         kwargs['requesting_uid'] = chan.uid | ||||||
|                         log.cancel( |                         log.cancel( | ||||||
|                             f'Remote request to cancel task\n' |                             f'Rx task cancel request\n' | ||||||
|                             f'remote actor: {chan.uid}\n' |                             f'<= canceller: {chan.uid}\n' | ||||||
|                             f'task: {target_cid}' |                             f'=> uid: {actor.uid}\n' | ||||||
|  |                             f'  |_cid: {target_cid}\n' | ||||||
|                         ) |                         ) | ||||||
|                         try: |                         try: | ||||||
|                             await _invoke( |                             await _invoke( | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue