diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 1d89c63d..fda3a5c5 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -234,7 +234,7 @@ class Actor: # state self._cancel_complete = trio.Event() - self._cancel_called_by_remote: tuple[str, tuple]|None = None + self._cancel_called_by: tuple[str, tuple]|None = None self._cancel_called: bool = False # retreive and store parent `__main__` data which @@ -346,69 +346,118 @@ class Actor: def pid(self) -> int: return self._aid.pid + @property + def repr_state(self) -> str: + if self.cancel_complete: + return 'cancelled' + + elif canceller := self.cancel_caller: + return f' and cancel-called by {canceller}' + + else: + return 'running' + def pformat( self, ds: str = ': ', indent: int = 0, + privates: bool = False, ) -> str: - fields_sect_prefix: str = ' |_' - parent_uid: tuple|None = None + + fmtstr: str = f'|_id: {self.aid.reprol()!r}\n' + if privates: + aid_nest_prefix: str = '|_aid=' + aid_field_repr: str = _pformat.nest_from_op( + input_op='', + text=pretty_struct.pformat( + struct=self.aid, + field_indent=2, + ), + op_suffix='', + nest_prefix=aid_nest_prefix, + nest_indent=0, + ) + fmtstr: str = f'{aid_field_repr}' + if rent_chan := self._parent_chan: - parent_uid = rent_chan.uid + fmtstr += ( + f"|_parent{ds}{rent_chan.aid.reprol()}\n" + ) - peers: list = [] server: _server.IPCServer = self.ipc_server - ipc_server_sect: str = '' if server: - peers: list[tuple] = list(server._peer_connected) + if privates: + server_repr: str = self._ipc_server.pformat( + privates=privates, + ) + # create field ln as a key-header indented under + # and up to the section's key prefix. + # ^XXX if we were to indent `repr(Server)` to + # ': ' + # _here_^ + server_repr: str = _pformat.nest_from_op( + input_op='', # nest as sub-obj + op_suffix='', + text=server_repr, + ) + fmtstr += ( + f"{server_repr}" + ) + else: + fmtstr += ( + f'|_ipc: {server.repr_state!r}\n' + ) - # create field ln as a key-header indented under - # and up to the section's key prefix. - # field_ln_header: str = textwrap.indent( - # text=f"ipc_server{ds}", - # prefix=' '*len(fields_sect_prefix), - # ) - # ^XXX if we were to indent `repr(Server)` to - # ': ' - # _here_^ - server_repr: str = textwrap.indent( - text=self._ipc_server.pformat(), - # prefix=' '*len(field_ln_header), - prefix=' '*len(fields_sect_prefix), - ) - ipc_server_sect: str = ( - # f'{field_ln_header}\n' - f'{server_repr}' - ) - - fmtstr: str = ( - f' |_id: {self.aid!r}\n' - # f" aid{ds}{self.aid!r}\n" - f" parent{ds}{parent_uid}\n" - # f'\n' - f' |_ipc: {len(peers)!r} connected peers\n' - f" peers{ds}{peers!r}\n" - f"{ipc_server_sect}" - # f'\n' - f' |_rpc: {len(self._rpc_tasks)} tasks\n' - f" ctxs{ds}{len(self._contexts)}\n" - # f'\n' - f' |_runtime: ._task{ds}{self._task!r}\n' - f' _spawn_method{ds}{self._spawn_method}\n' - f' _actoruid2nursery{ds}{self._actoruid2nursery}\n' - f' _forkserver_info{ds}{self._forkserver_info}\n' - # f'\n' - f' |_state: "TODO: .repr_state()"\n' - f' _cancel_complete{ds}{self._cancel_complete}\n' - f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n' - f' _cancel_called{ds}{self._cancel_called}\n' + fmtstr += ( + f'|_rpc: {len(self._rpc_tasks)} active tasks\n' ) + + # TODO, actually fix the .repr_state impl/output? + # append ipc-ctx state summary + # ctxs: dict = self._contexts + # if ctxs: + # ctx_states: dict[str, int] = {} + # for ctx in self._contexts.values(): + # ctx_state: str = ctx.repr_state + # cnt = ctx_states.setdefault(ctx_state, 0) + # ctx_states[ctx_state] = cnt + 1 + + # fmtstr += ( + # f" ctxs{ds}{ctx_states}\n" + # ) + + # runtime-state + task_name: str = '' + if task := self._task: + task_name: str = task.name + fmtstr += ( + # TODO, this just like ctx? + f'|_state: {self.repr_state!r}\n' + f' task: {task_name}\n' + f' loglevel: {self.loglevel!r}\n' + f' subactors_spawned: {len(self._actoruid2nursery)}\n' + ) + if not _state.is_root_process(): + fmtstr += f' spawn_method: {self._spawn_method!r}\n' + + if privates: + fmtstr += ( + # f' actoruid2nursery{ds}{self._actoruid2nursery}\n' + f' cancel_complete{ds}{self._cancel_complete}\n' + f' cancel_called_by_remote{ds}{self._cancel_called_by}\n' + f' cancel_called{ds}{self._cancel_called}\n' + ) + + if fmtstr: + fmtstr: str = textwrap.indent( + text=fmtstr, + prefix=' '*(1 + indent), + ) + _repr: str = ( - '\n' + f'<{type(self).__name__}(\n' + f'{fmtstr}' + f')>\n' ) if indent: _repr: str = textwrap.indent( @@ -533,11 +582,11 @@ class Actor: queue. ''' - uid: tuple[str, str] = chan.uid - assert uid, f"`chan.uid` can't be {uid}" + aid: msgtypes.Aid = chan.aid + assert aid, f"`chan.aid` can't be {aid}" try: ctx: Context = self._contexts[( - uid, + aid.uid, cid, # TODO: how to determine this tho? @@ -548,7 +597,7 @@ class Actor: 'Ignoring invalid IPC msg!?\n' f'Ctx seems to not/no-longer exist??\n' f'\n' - f'<=? {uid}\n' + f'<=? {aid.reprol()!r}\n' f' |_{pretty_struct.pformat(msg)}\n' ) match msg: @@ -597,6 +646,7 @@ class Actor: msging session's lifetime. ''' + # ?TODO, use Aid here as well? actor_uid = chan.uid assert actor_uid try: @@ -945,6 +995,22 @@ class Actor: None, # self cancel all rpc tasks ) + @property + def cancel_complete(self) -> bool: + return self._cancel_complete.is_set() + + @property + def cancel_called(self) -> bool: + ''' + Was this actor requested to cancel by a remote peer actor. + + ''' + return self._cancel_called_by is not None + + @property + def cancel_caller(self) -> msgtypes.Aid|None: + return self._cancel_called_by + async def cancel( self, @@ -969,20 +1035,18 @@ class Actor: ''' ( - requesting_uid, - requester_type, + requesting_aid, # Aid + requester_type, # str req_chan, log_meth, ) = ( - req_chan.uid, + req_chan.aid, 'peer', req_chan, log.cancel, - ) if req_chan else ( - # a self cancel of ALL rpc tasks - self.uid, + self.aid, 'self', self, log.runtime, @@ -990,14 +1054,14 @@ class Actor: # TODO: just use the new `Context.repr_rpc: str` (and # other) repr fields instead of doing this all manual.. msg: str = ( - f'Actor-runtime cancel request from {requester_type}\n\n' - f'<=c) {requesting_uid}\n' - f' |_{self}\n' + f'Actor-runtime cancel request from {requester_type!r}\n' f'\n' + f'<=c)\n' + f'{self}' ) # TODO: what happens here when we self-cancel tho? - self._cancel_called_by_remote: tuple = requesting_uid + self._cancel_called_by: tuple = requesting_aid self._cancel_called = True # cancel all ongoing rpc tasks @@ -1025,7 +1089,7 @@ class Actor: # self-cancel **all** ongoing RPC tasks await self.cancel_rpc_tasks( - req_uid=requesting_uid, + req_aid=requesting_aid, parent_chan=None, ) @@ -1054,8 +1118,7 @@ class Actor: self, cid: str, parent_chan: Channel, - requesting_uid: tuple[str, str]|None, - # ^^TODO! use the `Aid` directly here! + requesting_aid: msgtypes.Aid|None, ipc_msg: dict|None|bool = False, @@ -1093,7 +1156,7 @@ class Actor: log.runtime( 'Cancel request for invalid RPC task.\n' 'The task likely already completed or was never started!\n\n' - f'<= canceller: {requesting_uid}\n' + f'<= canceller: {requesting_aid}\n' f'=> {cid}@{parent_chan.uid}\n' f' |_{parent_chan}\n' ) @@ -1101,7 +1164,7 @@ class Actor: log.cancel( 'Rxed cancel request for RPC task\n' - f'{ctx._task!r} <=c) {requesting_uid}\n' + f'{ctx._task!r} <=c) {requesting_aid}\n' f'|_>> {ctx.repr_rpc}\n' # f'|_{ctx._task}\n' @@ -1127,9 +1190,9 @@ class Actor: ) if ( ctx._canceller is None - and requesting_uid + and requesting_aid ): - ctx._canceller: tuple = requesting_uid + ctx._canceller: tuple = requesting_aid.uid # TODO: pack the RPC `{'cmd': }` msg into a ctxc and # then raise and pack it here? @@ -1155,7 +1218,7 @@ class Actor: # wait for _invoke to mark the task complete flow_info: str = ( - f'<= canceller: {requesting_uid}\n' + f'<= canceller: {requesting_aid}\n' f'=> ipc-parent: {parent_chan}\n' f'|_{ctx}\n' ) @@ -1172,7 +1235,7 @@ class Actor: async def cancel_rpc_tasks( self, - req_uid: tuple[str, str], + req_aid: msgtypes.Aid, # NOTE: when None is passed we cancel **all** rpc # tasks running in this actor! @@ -1189,7 +1252,7 @@ class Actor: if not tasks: log.runtime( 'Actor has no cancellable RPC tasks?\n' - f'<= canceller: {req_uid}\n' + f'<= canceller: {req_aid.reprol()}\n' ) return @@ -1229,7 +1292,7 @@ class Actor: ) log.cancel( f'Cancelling {descr} RPC tasks\n\n' - f'<=c) {req_uid} [canceller]\n' + f'<=c) {req_aid} [canceller]\n' f'{rent_chan_repr}' f'c)=> {self.uid} [cancellee]\n' f' |_{self} [with {len(tasks)} tasks]\n' @@ -1257,7 +1320,7 @@ class Actor: await self._cancel_task( cid, task_caller_chan, - requesting_uid=req_uid, + requesting_aid=req_aid, ) if tasks: @@ -1554,8 +1617,9 @@ async def async_main( # 'Blocking on service nursery to exit..\n' ) log.runtime( - "Service nursery complete\n" - "Waiting on root nursery to complete" + 'Service nursery complete\n' + '\n' + '-> Waiting on root nursery to complete' ) # Blocks here as expected until the root nursery is @@ -1705,7 +1769,7 @@ async def async_main( ) teardown_report += ( 'Actor runtime exited\n' - f'{op_nested_actor_repr}\n' + f'{op_nested_actor_repr}' ) log.info(teardown_report)