diff --git a/tractor/_runtime.py b/tractor/_runtime.py index abdaf91a..81bab1d5 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -299,7 +299,7 @@ async def _errors_relayed_via_ipc( ) ) ): - # await pause() + # await _debug.pause() # XXX QUESTION XXX: is there any case where we'll # want to debug IPC disconnects as a default? # => I can't think of a reason that inspecting this @@ -319,6 +319,12 @@ async def _errors_relayed_via_ipc( cid=ctx.cid, ) + # NOTE: the src actor should always be packed into the + # error.. but how should we verify this? + # assert err_msg['src_actor_uid'] + # if not err_msg['error'].get('src_actor_uid'): + # import pdbp; pdbp.set_trace() + if is_rpc: try: await chan.send(err_msg) @@ -559,6 +565,7 @@ async def _invoke( # inside ._context._drain_to_final_msg()`.. # # TODO: remove this ^ right? if ctx._scope.cancelled_caught: + our_uid: tuple = actor.uid # first check for and raise any remote error # before raising any context cancelled case @@ -568,8 +575,9 @@ async def _invoke( ctx._maybe_raise_remote_err(re) cs: CancelScope = ctx._scope + if cs.cancel_called: - our_uid: tuple = actor.uid + canceller: tuple = ctx.canceller msg: str = ( 'actor was cancelled by ' @@ -625,15 +633,6 @@ async def _invoke( # f' |_{ctx}' ) - # TODO: does this ever get set any more or can - # we remove it? - if ctx._cancel_msg: - msg += ( - # '------ - ------\n' - # 'IPC msg:\n' - f'\n\n{ctx._cancel_msg}' - ) - # task-contex was either cancelled by request using # ``Portal.cancel_actor()`` or ``Context.cancel()`` # on the far end, or it was cancelled by the local @@ -1677,7 +1676,9 @@ class Actor: self, cid: str, parent_chan: Channel, - requesting_uid: tuple[str, str] | None = None, + + requesting_uid: tuple[str, str]|None = None, + ipc_msg: dict|None|bool = False, ) -> bool: ''' @@ -1688,16 +1689,13 @@ class Actor: in the signature (for now). ''' - # this ctx based lookup ensures the requested task to - # be cancelled was indeed spawned by a request from - # this channel + + # this ctx based lookup ensures the requested task to be + # cancelled was indeed spawned by a request from its + # parent (or some grandparent's) channel ctx: Context func: Callable is_complete: trio.Event - - # NOTE: right now this is only implicitly called by - # streaming IPC but it should be called - # to cancel any remotely spawned task try: ( ctx, @@ -1725,20 +1723,23 @@ class Actor: log.cancel( 'Cancel request for RPC task\n\n' - f'<= ._cancel_task(): {requesting_uid}\n' - f' |_ @{ctx.dmaddr}\n\n' + f'<= Actor.cancel_task(): {requesting_uid}\n\n' + f'=> {ctx._task}\n' + f' |_ >> {ctx.repr_rpc}\n' + # f' >> Actor._cancel_task() => {ctx._task}\n' + # f' |_ {ctx._task}\n\n' # TODO: better ascii repr for "supervisor" like # a nursery or context scope? # f'=> {parent_chan}\n' - f'=> {ctx._task}\n' + # f' |_{ctx._task}\n' # TODO: simplified `Context.__repr__()` fields output # shows only application state-related stuff like, # - ._stream # - .closed # - .started_called # - .. etc. - f' >> {ctx.repr_rpc}\n' + # f' >> {ctx.repr_rpc}\n' # f' |_ctx: {cid}\n' # f' >> {ctx._nsf}()\n' ) @@ -1748,6 +1749,16 @@ class Actor: ): ctx._canceller: tuple = requesting_uid + # TODO: pack the RPC `{'cmd': }` msg into a ctxc and + # then raise and pack it here? + if ( + ipc_msg + and ctx._cancel_msg is None + ): + # assign RPC msg directly from the loop which usually + # the case with `ctx.cancel()` on the other side. + ctx._cancel_msg = ipc_msg + # don't allow cancelling this function mid-execution # (is this necessary?) if func is self._cancel_task: @@ -1828,10 +1839,15 @@ class Actor: else "IPC channel's " ) - + rent_chan_repr: str = ( + f'|_{parent_chan}' + if parent_chan + else '' + ) log.cancel( f'Cancelling {descr} {len(tasks)} rpc tasks\n\n' - f'<= .cancel_rpc_tasks(): {req_uid}\n' + f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n' + f' {rent_chan_repr}\n' # f'{self}\n' # f'{tasks_str}' ) @@ -1851,9 +1867,6 @@ class Actor: ): continue - # if func == self._cancel_task: - # continue - # TODO: this maybe block on the task cancellation # and so should really done in a nursery batch? await self._cancel_task( @@ -2198,6 +2211,8 @@ async def process_messages( await actor._cancel_task( cid, channel, + + ipc_msg=msg, ) break @@ -2308,6 +2323,7 @@ async def process_messages( # cancel it! 'parent_chan': chan, 'requesting_uid': chan.uid, + 'ipc_msg': msg, } # TODO: remove? already have emit in meth. # log.runtime( @@ -2575,7 +2591,7 @@ class Arbiter(Actor): sockaddr: tuple[str, int] for (aname, _), sockaddr in self._registry.items(): - log.info( + log.runtime( f'Actor mailbox info:\n' f'aname: {aname}\n' f'sockaddr: {sockaddr}\n'