diff --git a/tractor/_runtime.py b/tractor/_runtime.py index d127d9d..516c290 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -322,7 +322,7 @@ async def _invoke( else f'result: {ctx._result}' ) log.cancel( - f'IPC context terminated with final {res_str}\n' + f'IPC context terminated with final {res_str}\n\n' f'|_{pformat(ctx)}\n' ) @@ -1022,14 +1022,14 @@ class Actor: and poll() is None ): log.cancel( - f'Peer actor IPC broke but proc is alive?\n' - f'uid: {uid}\n' - f'|_{proc}\n' + f'Peer IPC broke but subproc is alive?\n\n' + + f'<=x @{chan.raddr}\n' + f' |_{proc}\n' ) # ``Channel`` teardown and closure sequence - - # Drop ref to channel so it can be gc-ed and disconnected + # drop ref to channel so it can be gc-ed and disconnected log.runtime( f'Disconnected IPC channel:\n' f'uid: {chan.uid}\n' @@ -1177,8 +1177,12 @@ class Actor: ctx: Context = self._contexts[(uid, cid)] except KeyError: log.warning( - f'Ignoring msg from [no-longer/un]known context {uid}:' - f'\n{msg}') + 'Ignoring invalid IPC ctx msg!\n\n' + f'<= sender: {uid}\n' + f'=> cid: {cid}\n\n' + + f'{msg}\n' + ) return return await ctx._deliver_msg(msg) @@ -1381,9 +1385,12 @@ class Actor: except OSError: # failed to connect log.warning( - f"Failed to connect to parent @ {parent_addr}," - " closing server") - await self.cancel(requesting_uid=self.uid) + f'Failed to connect to parent!?\n\n' + 'Closing IPC [TCP] transport server to\n' + f'{parent_addr}\n' + f'|_{self}\n\n' + ) + await self.cancel(chan=None) # self cancel raise async def _serve_forever( @@ -1451,29 +1458,53 @@ class Actor: assert self._service_n self._service_n.start_soon( self.cancel, - self.uid, + None, # self cancel all rpc tasks ) async def cancel( self, - requesting_uid: tuple[str, str], + + # chan whose lifetime limits the lifetime of its remotely + # requested and locally spawned RPC tasks - similar to the + # supervision semantics of a nursery wherein the actual + # implementation does start all such tasks in + # a sub-nursery. + req_chan: Channel|None, ) -> bool: ''' - Cancel this actor's runtime. + Cancel this actor's runtime, eventually resulting in + the exit its containing process. - The "deterministic" teardown sequence in order is: - - cancel all ongoing rpc tasks by cancel scope - - cancel the channel server to prevent new inbound - connections - - cancel the "service" nursery reponsible for - spawning new rpc tasks - - return control the parent channel message loop + The ideal "deterministic" teardown sequence in order is: + - cancel all ongoing rpc tasks by cancel scope + - cancel the channel server to prevent new inbound + connections + - cancel the "service" nursery reponsible for + spawning new rpc tasks + - return control the parent channel message loop ''' - log.cancel( - f'{self.uid} requested to cancel by:\n' - f'{requesting_uid}' + ( + requesting_uid, + requester_type, + req_chan, + + ) = ( + req_chan.uid, + 'peer', + req_chan, + + ) if req_chan else ( + + # a self cancel of ALL rpc tasks + self.uid, + 'self', + self + ) + msg: str = ( + f'`Actor.cancel()` request from {requester_type}:\n' + f'<= {requesting_uid}\n' ) # TODO: what happens here when we self-cancel tho? @@ -1487,12 +1518,16 @@ class Actor: # with the root actor in this tree dbcs = _debug.Lock._debugger_request_cs if dbcs is not None: - log.cancel("Cancelling active debugger request") + msg += ( + '>> Cancelling active debugger request..\n' + f'|_{_debug.Lock}\n' + ) dbcs.cancel() - # kill all ongoing tasks + # self-cancel **all** ongoing RPC tasks await self.cancel_rpc_tasks( - requesting_uid=requesting_uid, + req_uid=requesting_uid, + parent_chan=None, ) # stop channel server @@ -1501,13 +1536,14 @@ class Actor: await self._server_down.wait() else: log.warning( - f'{self.uid} was likely cancelled before it started') + 'Transport[TCP] server was cancelled start?' + ) # cancel all rpc tasks permanently if self._service_n: self._service_n.cancel_scope.cancel() - log.cancel(f"{self.uid} called `Actor.cancel()`") + log.cancel(msg) self._cancel_complete.set() return True @@ -1522,7 +1558,7 @@ class Actor: async def _cancel_task( self, cid: str, - chan: Channel, + parent_chan: Channel, requesting_uid: tuple[str, str] | None = None, ) -> bool: @@ -1534,13 +1570,25 @@ class Actor: in the signature (for now). ''' - # right now this is only implicitly called by + # this ctx based lookup ensures the requested task to + # be cancelled was indeed spawned by a request from + # this channel + ctx: Context + func: Callable + is_complete: trio.Event + + # NOTE: right now this is only implicitly called by # streaming IPC but it should be called # to cancel any remotely spawned task try: - # this ctx based lookup ensures the requested task to - # be cancelled was indeed spawned by a request from this channel - ctx, func, is_complete = self._rpc_tasks[(chan, cid)] + ( + ctx, + func, + is_complete, + ) = self._rpc_tasks[( + parent_chan, + cid, + )] scope: CancelScope = ctx._scope except KeyError: @@ -1551,17 +1599,28 @@ class Actor: # - callee errors prior to cancel req. log.cancel( 'Cancel request invalid, RPC task already completed?\n' - f'<= canceller: {requesting_uid}\n' - f' |_{chan}\n\n' - - f'=> ctx id: {cid}\n' + f'<= canceller: {requesting_uid}\n\n' + f'=>{parent_chan}\n' + f' |_ctx-id: {cid}\n' ) return True log.cancel( - f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" - f"peer: {chan.uid}\n") + 'Cancel request for RPC task\n' + f'<= canceller: {requesting_uid}\n\n' + # TODO: better ascii repr for "supervisor" like + # a nursery or context scope? + f'=> ipc-parent: {parent_chan}\n' + # TODO: simplified `Context.__repr__()` fields output + # shows only application state-related stuff like, + # - ._stream + # - .closed + # - .started_called + # - .. etc. + f' |_ctx: {cid}\n' + f' >> {ctx._nsf}()\n' + ) if ( ctx._canceller is None and requesting_uid @@ -1571,6 +1630,7 @@ class Actor: # don't allow cancelling this function mid-execution # (is this necessary?) if func is self._cancel_task: + log.error('Do not cancel a cancel!?') return True # TODO: shouldn't we eventually be calling ``Context.cancel()`` @@ -1580,23 +1640,29 @@ class Actor: scope.cancel() # wait for _invoke to mark the task complete + flow_info: str = ( + f'<= canceller: {requesting_uid}\n' + f'=> ipc-parent: {parent_chan}\n' + f' |_{ctx}\n' + ) log.runtime( - 'Waiting on task to cancel:\n' - f'cid: {cid}\nfunc: {func}\n' - f'peer: {chan.uid}\n' + 'Waiting on RPC task to cancel\n' + f'{flow_info}' ) await is_complete.wait() - log.runtime( - f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" - f"peer: {chan.uid}\n") - + f'Sucessfully cancelled RPC task\n' + f'{flow_info}' + ) return True async def cancel_rpc_tasks( self, - only_chan: Channel | None = None, - requesting_uid: tuple[str, str] | None = None, + req_uid: tuple[str, str], + + # NOTE: when None is passed we cancel **all** rpc + # tasks running in this actor! + parent_chan: Channel|None, ) -> None: ''' @@ -1605,38 +1671,76 @@ class Actor: ''' tasks: dict = self._rpc_tasks - if tasks: - tasks_str: str = '' - for (ctx, func, _) in tasks.values(): - tasks_str += ( - f' |_{func.__name__}() [cid={ctx.cid[-6:]}..]\n' - ) - - log.cancel( - f'Cancelling all {len(tasks)} rpc tasks:\n' - f'{tasks_str}' + if not tasks: + log.warning( + 'Actor has no cancellable RPC tasks?\n' + f'<= cancel requester: {req_uid}\n' + f'=> {self}\n\n' ) - for ( - (chan, cid), - (ctx, func, is_complete), - ) in tasks.copy().items(): - if only_chan is not None: - if only_chan != chan: - continue + return - # TODO: this should really done in a nursery batch - if func != self._cancel_task: - await self._cancel_task( - cid, - chan, - requesting_uid=requesting_uid, - ) + # TODO: seriously factor this into some helper funcs XD + tasks_str: str = '' + for (ctx, func, _) in tasks.values(): - log.cancel( - 'Waiting for remaining rpc tasks to complete:\n' - f'{tasks}' + # TODO: std repr of all primitives in + # a hierarchical tree format, since we can!! + # like => repr for funcs/addrs/msg-typing: + # + # -[ ] use a proper utf8 "arm" like + # `stackscope` has! + # -[ ] for typed msging, show the + # py-type-annot style? + # - maybe auto-gen via `inspect` / `typing` type-sig: + # https://stackoverflow.com/a/57110117 + # => see ex. code pasted into `.msg.types` + # + # -[ ] proper .maddr() for IPC primitives? + # - `Channel.maddr() -> str:` obvi! + # - `Context.maddr() -> str:` + tasks_str += ( + f' |_@ /ipv4/tcp/cid="{ctx.cid[-16:]} .."\n' + f' |>> {ctx._nsf}() -> dict:\n' ) - await self._ongoing_rpc_tasks.wait() + + log.cancel( + f'Cancelling all {len(tasks)} rpc tasks:\n\n' + f'<= .cancel() from {req_uid}\n' + f'{self}\n' + f'{tasks_str}' + ) + for ( + (task_caller_chan, cid), + (ctx, func, is_complete), + ) in tasks.copy().items(): + + if ( + # maybe filter to specific IPC channel? + (parent_chan + and + task_caller_chan != parent_chan) + + # never "cancel-a-cancel" XD + or (func == self._cancel_task) + ): + continue + + # if func == self._cancel_task: + # continue + + # TODO: this maybe block on the task cancellation + # and so should really done in a nursery batch? + await self._cancel_task( + cid, + task_caller_chan, + requesting_uid=req_uid, + ) + + log.cancel( + 'Waiting for remaining rpc tasks to complete\n' + f'|_{tasks}' + ) + await self._ongoing_rpc_tasks.wait() def cancel_server(self) -> None: ''' @@ -2092,10 +2196,11 @@ async def process_messages( f'=> {ns}.{funcname}({kwargs})\n' ) if ns == 'self': - uid: tuple = chan.uid if funcname == 'cancel': func: Callable = actor.cancel - kwargs['requesting_uid'] = uid + kwargs |= { + 'req_chan': chan, + } # don't start entire actor runtime cancellation # if this actor is currently in debug mode! @@ -2109,11 +2214,6 @@ async def process_messages( # and immediately start the core runtime # machinery shutdown! with CancelScope(shield=True): - log.cancel( - f'Cancel request for `Actor` runtime\n' - f'<= canceller: {uid}\n' - # f'=> uid: {actor.uid}\n' - ) await _invoke( actor, cid, @@ -2123,25 +2223,32 @@ async def process_messages( is_rpc=False, ) - log.cancel( - f'Cancelling IPC msg-loop with {chan.uid}' + log.runtime( + 'Cancelling IPC transport msg-loop with peer:\n' + f'|_{chan}\n' ) loop_cs.cancel() break if funcname == '_cancel_task': - func = actor._cancel_task + func: Callable = actor._cancel_task # we immediately start the runtime machinery # shutdown # with CancelScope(shield=True): - kwargs['chan'] = chan - target_cid = kwargs['cid'] - kwargs['requesting_uid'] = chan.uid + target_cid: str = kwargs['cid'] + kwargs |= { + # NOTE: ONLY the rpc-task-owning + # parent IPC channel should be able to + # cancel it! + 'parent_chan': chan, + 'requesting_uid': chan.uid, + } log.cancel( f'Rx task cancel request\n' f'<= canceller: {chan.uid}\n' - f'=> uid: {actor.uid}\n' + f' |_{chan}\n\n' + f'=> {actor}\n' f' |_cid: {target_cid}\n' ) try: @@ -2154,8 +2261,13 @@ async def process_messages( is_rpc=False, ) except BaseException: - log.exception("failed to cancel task?") - + log.exception( + 'Failed to cancel task?\n' + f'<= canceller: {chan.uid}\n' + f' |_{chan}\n\n' + f'=> {actor}\n' + f' |_cid: {target_cid}\n' + ) continue else: # normally registry methods, eg. @@ -2174,9 +2286,25 @@ async def process_messages( await chan.send(err_msg) continue - # spin up a task for the requested function - log.runtime(f"Spawning task for {func}") - assert actor._service_n + # schedule a task for the requested RPC function + # in the actor's main "service nursery". + # TODO: possibly a service-tn per IPC channel for + # supervision isolation? would avoid having to + # manage RPC tasks individually in `._rpc_tasks` + # table? + log.runtime( + f'Spawning task for RPC request\n' + f'<= caller: {chan.uid}\n' + f' |_{chan}\n\n' + # TODO: maddr style repr? + # f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/' + # f'cid="{cid[-16:]} .."\n\n' + + f'=> {actor}\n' + f' |_cid: {cid}\n' + f' |>> {func}()\n' + ) + assert actor._service_n # wait why? do it at top? try: ctx: Context = await actor._service_n.start( partial( @@ -2234,7 +2362,13 @@ async def process_messages( log.runtime( f"{chan} for {chan.uid} disconnected, cancelling tasks" ) - await actor.cancel_rpc_tasks(chan) + await actor.cancel_rpc_tasks( + req_uid=actor.uid, + # a "self cancel" in terms of the lifetime of the + # IPC connection which is presumed to be the + # source of any requests for spawned tasks. + parent_chan=chan, + ) except ( TransportClosed,