From 364ea919835a4f23e27928bedcc1aaa6cf2e7385 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 7 Mar 2024 18:24:00 -0500 Subject: [PATCH] Set `._cancel_msg` to RPC `{cmd: 'self._cancel_task', ..}` msg Like how we set `Context._cancel_msg` in `._deliver_msg()` (in which case normally it's an `{'error': ..}` msg), do the same when any RPC task is remotely cancelled via `Actor._cancel_task` where that task doesn't yet have a cancel msg set yet. This makes is much easier to distinguish between ctx cancellations due to some remote error vs. Explicit remote requests via any of `Actor.cancel()`, `Portal.cancel_actor()` or `Context.cancel()`. --- tractor/_runtime.py | 74 +++++++++++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 29 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 4c1181d..64549ba 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -302,7 +302,7 @@ async def _errors_relayed_via_ipc( ) ) ): - # await pause() + # await _debug.pause() # XXX QUESTION XXX: is there any case where we'll # want to debug IPC disconnects as a default? # => I can't think of a reason that inspecting this @@ -322,6 +322,12 @@ async def _errors_relayed_via_ipc( cid=ctx.cid, ) + # NOTE: the src actor should always be packed into the + # error.. but how should we verify this? + # assert err_msg['src_actor_uid'] + # if not err_msg['error'].get('src_actor_uid'): + # import pdbp; pdbp.set_trace() + if is_rpc: try: await chan.send(err_msg) @@ -566,6 +572,7 @@ async def _invoke( # inside ._context._drain_to_final_msg()`.. # # TODO: remove this ^ right? if ctx._scope.cancelled_caught: + our_uid: tuple = actor.uid # first check for and raise any remote error # before raising any context cancelled case @@ -575,8 +582,9 @@ async def _invoke( ctx._maybe_raise_remote_err(re) cs: CancelScope = ctx._scope + if cs.cancel_called: - our_uid: tuple = actor.uid + canceller: tuple = ctx.canceller msg: str = ( 'actor was cancelled by ' @@ -632,15 +640,6 @@ async def _invoke( # f' |_{ctx}' ) - # TODO: does this ever get set any more or can - # we remove it? - if ctx._cancel_msg: - msg += ( - # '------ - ------\n' - # 'IPC msg:\n' - f'\n\n{ctx._cancel_msg}' - ) - # task-contex was either cancelled by request using # ``Portal.cancel_actor()`` or ``Context.cancel()`` # on the far end, or it was cancelled by the local @@ -1753,7 +1752,9 @@ class Actor: self, cid: str, parent_chan: Channel, - requesting_uid: tuple[str, str] | None = None, + + requesting_uid: tuple[str, str]|None = None, + ipc_msg: dict|None|bool = False, ) -> bool: ''' @@ -1764,16 +1765,13 @@ class Actor: in the signature (for now). ''' - # this ctx based lookup ensures the requested task to - # be cancelled was indeed spawned by a request from - # this channel + + # this ctx based lookup ensures the requested task to be + # cancelled was indeed spawned by a request from its + # parent (or some grandparent's) channel ctx: Context func: Callable is_complete: trio.Event - - # NOTE: right now this is only implicitly called by - # streaming IPC but it should be called - # to cancel any remotely spawned task try: ( ctx, @@ -1801,20 +1799,23 @@ class Actor: log.cancel( 'Cancel request for RPC task\n\n' - f'<= ._cancel_task(): {requesting_uid}\n' - f' |_ @{ctx.dmaddr}\n\n' + f'<= Actor.cancel_task(): {requesting_uid}\n\n' + f'=> {ctx._task}\n' + f' |_ >> {ctx.repr_rpc}\n' + # f' >> Actor._cancel_task() => {ctx._task}\n' + # f' |_ {ctx._task}\n\n' # TODO: better ascii repr for "supervisor" like # a nursery or context scope? # f'=> {parent_chan}\n' - f'=> {ctx._task}\n' + # f' |_{ctx._task}\n' # TODO: simplified `Context.__repr__()` fields output # shows only application state-related stuff like, # - ._stream # - .closed # - .started_called # - .. etc. - f' >> {ctx.repr_rpc}\n' + # f' >> {ctx.repr_rpc}\n' # f' |_ctx: {cid}\n' # f' >> {ctx._nsf}()\n' ) @@ -1824,6 +1825,16 @@ class Actor: ): ctx._canceller: tuple = requesting_uid + # TODO: pack the RPC `{'cmd': }` msg into a ctxc and + # then raise and pack it here? + if ( + ipc_msg + and ctx._cancel_msg is None + ): + # assign RPC msg directly from the loop which usually + # the case with `ctx.cancel()` on the other side. + ctx._cancel_msg = ipc_msg + # don't allow cancelling this function mid-execution # (is this necessary?) if func is self._cancel_task: @@ -1904,10 +1915,15 @@ class Actor: else "IPC channel's " ) - + rent_chan_repr: str = ( + f'|_{parent_chan}' + if parent_chan + else '' + ) log.cancel( f'Cancelling {descr} {len(tasks)} rpc tasks\n\n' - f'<= .cancel_rpc_tasks(): {req_uid}\n' + f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n' + f' {rent_chan_repr}\n' # f'{self}\n' # f'{tasks_str}' ) @@ -1927,9 +1943,6 @@ class Actor: ): continue - # if func == self._cancel_task: - # continue - # TODO: this maybe block on the task cancellation # and so should really done in a nursery batch? await self._cancel_task( @@ -2339,6 +2352,8 @@ async def process_messages( await actor._cancel_task( cid, channel, + + ipc_msg=msg, ) break @@ -2449,6 +2464,7 @@ async def process_messages( # cancel it! 'parent_chan': chan, 'requesting_uid': chan.uid, + 'ipc_msg': msg, } # TODO: remove? already have emit in meth. # log.runtime( @@ -2737,7 +2753,7 @@ class Arbiter(Actor): sockaddr: tuple[str, int] for (aname, _), sockaddr in self._registry.items(): - log.info( + log.runtime( f'Actor mailbox info:\n' f'aname: {aname}\n' f'sockaddr: {sockaddr}\n'