Refine `Actor` status iface, use `Aid` throughout
To simplify `.pformat()` output when the new `privates: bool` is unset (the default) this adds new public attrs to wrap an actor's cancellation status as well as provide a `.repr_state: str` (similar to our equiv on `Context`). Rework `.pformat()` to render a much simplified repr using all these new refinements. Further, port the `.cancel()` method to use `.msg.types.Aid` for all internal `requesting_uid` refs (now renamed with `_aid`) and in all called downstream methods. New cancel-state iface deats, - rename `._cancel_called_by_remote` -> `._cancel_called_by` and expect it to be set as an `Aid`. - add `.cancel_complete: bool` which flags whether `.cancel()` ran to completion. - add `.cancel_called: bool` which just wraps `._cancel_called` (and which likely will just be dropped since we already have `._cancel_called_by`). - add `.cancel_caller: Aid|None` which wraps `._cancel_called_by`. In terms of using `Aid` in cancel methods, - rename vars with `_aid` suffix in `.cancel()` (and wherever else). - change `.cancel_rpc_tasks()` input param to `req_aid: msgtypes.Aid`. - do the same for `._cancel_task()` and (for now until we adjust its internals as well) use the `Aid.uid` remap property when assigning `Context._canceller`. - adjust all log msg refs to match obvi.to_asyncio_eoc_signal
							parent
							
								
									1592f7e6be
								
							
						
					
					
						commit
						a890e9aa83
					
				|  | @ -234,7 +234,7 @@ class Actor: | |||
| 
 | ||||
|         # state | ||||
|         self._cancel_complete = trio.Event() | ||||
|         self._cancel_called_by_remote: tuple[str, tuple]|None = None | ||||
|         self._cancel_called_by: tuple[str, tuple]|None = None | ||||
|         self._cancel_called: bool = False | ||||
| 
 | ||||
|         # retreive and store parent `__main__` data which | ||||
|  | @ -346,69 +346,118 @@ class Actor: | |||
|     def pid(self) -> int: | ||||
|         return self._aid.pid | ||||
| 
 | ||||
|     @property | ||||
|     def repr_state(self) -> str: | ||||
|         if self.cancel_complete: | ||||
|             return 'cancelled' | ||||
| 
 | ||||
|         elif canceller := self.cancel_caller: | ||||
|                 return f' and cancel-called by {canceller}' | ||||
| 
 | ||||
|         else: | ||||
|             return 'running' | ||||
| 
 | ||||
|     def pformat( | ||||
|         self, | ||||
|         ds: str = ': ', | ||||
|         indent: int = 0, | ||||
|         privates: bool = False, | ||||
|     ) -> str: | ||||
|         fields_sect_prefix: str = ' |_' | ||||
|         parent_uid: tuple|None = None | ||||
| 
 | ||||
|         fmtstr: str = f'|_id: {self.aid.reprol()!r}\n' | ||||
|         if privates: | ||||
|             aid_nest_prefix: str = '|_aid=' | ||||
|             aid_field_repr: str = _pformat.nest_from_op( | ||||
|                 input_op='', | ||||
|                 text=pretty_struct.pformat( | ||||
|                     struct=self.aid, | ||||
|                     field_indent=2, | ||||
|                 ), | ||||
|                 op_suffix='', | ||||
|                 nest_prefix=aid_nest_prefix, | ||||
|                 nest_indent=0, | ||||
|             ) | ||||
|             fmtstr: str = f'{aid_field_repr}' | ||||
| 
 | ||||
|         if rent_chan := self._parent_chan: | ||||
|             parent_uid = rent_chan.uid | ||||
|             fmtstr += ( | ||||
|                 f"|_parent{ds}{rent_chan.aid.reprol()}\n" | ||||
|             ) | ||||
| 
 | ||||
|         peers: list = [] | ||||
|         server: _server.IPCServer = self.ipc_server | ||||
|         ipc_server_sect: str = '' | ||||
|         if server: | ||||
|             peers: list[tuple] = list(server._peer_connected) | ||||
|             if privates: | ||||
|                 server_repr: str = self._ipc_server.pformat( | ||||
|                     privates=privates, | ||||
|                 ) | ||||
|                 # create field ln as a key-header indented under | ||||
|                 # and up to the section's key prefix. | ||||
|                 # ^XXX if we were to indent `repr(Server)` to | ||||
|                 # '<key>: ' | ||||
|                 #  _here_^ | ||||
|                 server_repr: str = _pformat.nest_from_op( | ||||
|                     input_op='',  # nest as sub-obj | ||||
|                     op_suffix='', | ||||
|                     text=server_repr, | ||||
|                 ) | ||||
|                 fmtstr += ( | ||||
|                     f"{server_repr}" | ||||
|                 ) | ||||
|             else: | ||||
|                 fmtstr += ( | ||||
|                     f'|_ipc: {server.repr_state!r}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             # create field ln as a key-header indented under | ||||
|             # and up to the section's key prefix. | ||||
|             # field_ln_header: str = textwrap.indent( | ||||
|             #     text=f"ipc_server{ds}", | ||||
|             #     prefix=' '*len(fields_sect_prefix), | ||||
|             # ) | ||||
|             # ^XXX if we were to indent `repr(Server)` to | ||||
|             # '<key>: ' | ||||
|             #  _here_^ | ||||
|             server_repr: str = textwrap.indent( | ||||
|                 text=self._ipc_server.pformat(), | ||||
|                 # prefix=' '*len(field_ln_header), | ||||
|                 prefix=' '*len(fields_sect_prefix), | ||||
|             ) | ||||
|             ipc_server_sect: str = ( | ||||
|                 # f'{field_ln_header}\n' | ||||
|                 f'{server_repr}' | ||||
|             ) | ||||
| 
 | ||||
|         fmtstr: str = ( | ||||
|             f' |_id: {self.aid!r}\n' | ||||
|             # f"   aid{ds}{self.aid!r}\n" | ||||
|             f"   parent{ds}{parent_uid}\n" | ||||
|             # f'\n' | ||||
|             f' |_ipc: {len(peers)!r} connected peers\n' | ||||
|             f"   peers{ds}{peers!r}\n" | ||||
|             f"{ipc_server_sect}" | ||||
|             # f'\n' | ||||
|             f' |_rpc: {len(self._rpc_tasks)} tasks\n' | ||||
|             f"   ctxs{ds}{len(self._contexts)}\n" | ||||
|             # f'\n' | ||||
|             f' |_runtime: ._task{ds}{self._task!r}\n' | ||||
|             f'   _spawn_method{ds}{self._spawn_method}\n' | ||||
|             f'   _actoruid2nursery{ds}{self._actoruid2nursery}\n' | ||||
|             f'   _forkserver_info{ds}{self._forkserver_info}\n' | ||||
|             # f'\n' | ||||
|             f' |_state: "TODO: .repr_state()"\n' | ||||
|             f'   _cancel_complete{ds}{self._cancel_complete}\n' | ||||
|             f'   _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n' | ||||
|             f'   _cancel_called{ds}{self._cancel_called}\n' | ||||
|         fmtstr += ( | ||||
|             f'|_rpc: {len(self._rpc_tasks)} active tasks\n' | ||||
|         ) | ||||
| 
 | ||||
|         # TODO, actually fix the .repr_state impl/output? | ||||
|         # append ipc-ctx state summary | ||||
|         # ctxs: dict = self._contexts | ||||
|         # if ctxs: | ||||
|         #     ctx_states: dict[str, int] = {} | ||||
|         #     for ctx in self._contexts.values(): | ||||
|         #         ctx_state: str = ctx.repr_state | ||||
|         #         cnt = ctx_states.setdefault(ctx_state, 0) | ||||
|         #         ctx_states[ctx_state] = cnt + 1 | ||||
| 
 | ||||
|         #     fmtstr += ( | ||||
|         #         f"  ctxs{ds}{ctx_states}\n" | ||||
|         #     ) | ||||
| 
 | ||||
|         # runtime-state | ||||
|         task_name: str = '<dne>' | ||||
|         if task := self._task: | ||||
|             task_name: str = task.name | ||||
|         fmtstr += ( | ||||
|             # TODO, this just like ctx? | ||||
|             f'|_state: {self.repr_state!r}\n' | ||||
|             f'  task: {task_name}\n' | ||||
|             f'  loglevel: {self.loglevel!r}\n' | ||||
|             f'  subactors_spawned: {len(self._actoruid2nursery)}\n' | ||||
|         ) | ||||
|         if not _state.is_root_process(): | ||||
|             fmtstr += f'  spawn_method: {self._spawn_method!r}\n' | ||||
| 
 | ||||
|         if privates: | ||||
|             fmtstr += ( | ||||
|                 # f'  actoruid2nursery{ds}{self._actoruid2nursery}\n' | ||||
|                 f'  cancel_complete{ds}{self._cancel_complete}\n' | ||||
|                 f'  cancel_called_by_remote{ds}{self._cancel_called_by}\n' | ||||
|                 f'  cancel_called{ds}{self._cancel_called}\n' | ||||
|             ) | ||||
| 
 | ||||
|         if fmtstr: | ||||
|             fmtstr: str = textwrap.indent( | ||||
|                 text=fmtstr, | ||||
|                 prefix=' '*(1 + indent), | ||||
|             ) | ||||
| 
 | ||||
|         _repr: str = ( | ||||
|             '<Actor(\n' | ||||
|             + | ||||
|             fmtstr | ||||
|             + | ||||
|             ')>\n' | ||||
|             f'<{type(self).__name__}(\n' | ||||
|             f'{fmtstr}' | ||||
|             f')>\n' | ||||
|         ) | ||||
|         if indent: | ||||
|             _repr: str = textwrap.indent( | ||||
|  | @ -533,11 +582,11 @@ class Actor: | |||
|         queue. | ||||
| 
 | ||||
|         ''' | ||||
|         uid: tuple[str, str] = chan.uid | ||||
|         assert uid, f"`chan.uid` can't be {uid}" | ||||
|         aid: msgtypes.Aid = chan.aid | ||||
|         assert aid, f"`chan.aid` can't be {aid}" | ||||
|         try: | ||||
|             ctx: Context = self._contexts[( | ||||
|                 uid, | ||||
|                 aid.uid, | ||||
|                 cid, | ||||
| 
 | ||||
|                 # TODO: how to determine this tho? | ||||
|  | @ -548,7 +597,7 @@ class Actor: | |||
|                 'Ignoring invalid IPC msg!?\n' | ||||
|                 f'Ctx seems to not/no-longer exist??\n' | ||||
|                 f'\n' | ||||
|                 f'<=? {uid}\n' | ||||
|                 f'<=? {aid.reprol()!r}\n' | ||||
|                 f'  |_{pretty_struct.pformat(msg)}\n' | ||||
|             ) | ||||
|             match msg: | ||||
|  | @ -597,6 +646,7 @@ class Actor: | |||
|           msging session's lifetime. | ||||
| 
 | ||||
|         ''' | ||||
|         # ?TODO, use Aid here as well? | ||||
|         actor_uid = chan.uid | ||||
|         assert actor_uid | ||||
|         try: | ||||
|  | @ -945,6 +995,22 @@ class Actor: | |||
|             None,  # self cancel all rpc tasks | ||||
|         ) | ||||
| 
 | ||||
|     @property | ||||
|     def cancel_complete(self) -> bool: | ||||
|         return self._cancel_complete.is_set() | ||||
| 
 | ||||
|     @property | ||||
|     def cancel_called(self) -> bool: | ||||
|         ''' | ||||
|         Was this actor requested to cancel by a remote peer actor. | ||||
| 
 | ||||
|         ''' | ||||
|         return self._cancel_called_by is not None | ||||
| 
 | ||||
|     @property | ||||
|     def cancel_caller(self) -> msgtypes.Aid|None: | ||||
|         return self._cancel_called_by | ||||
| 
 | ||||
|     async def cancel( | ||||
|         self, | ||||
| 
 | ||||
|  | @ -969,20 +1035,18 @@ class Actor: | |||
| 
 | ||||
|         ''' | ||||
|         ( | ||||
|             requesting_uid, | ||||
|             requester_type, | ||||
|             requesting_aid,  # Aid | ||||
|             requester_type,  # str | ||||
|             req_chan, | ||||
|             log_meth, | ||||
|         ) = ( | ||||
|             req_chan.uid, | ||||
|             req_chan.aid, | ||||
|             'peer', | ||||
|             req_chan, | ||||
|             log.cancel, | ||||
| 
 | ||||
|         ) if req_chan else ( | ||||
| 
 | ||||
|             # a self cancel of ALL rpc tasks | ||||
|             self.uid, | ||||
|             self.aid, | ||||
|             'self', | ||||
|             self, | ||||
|             log.runtime, | ||||
|  | @ -990,14 +1054,14 @@ class Actor: | |||
|         # TODO: just use the new `Context.repr_rpc: str` (and | ||||
|         # other) repr fields instead of doing this all manual.. | ||||
|         msg: str = ( | ||||
|             f'Actor-runtime cancel request from {requester_type}\n\n' | ||||
|             f'<=c) {requesting_uid}\n' | ||||
|             f'  |_{self}\n' | ||||
|             f'Actor-runtime cancel request from {requester_type!r}\n' | ||||
|             f'\n' | ||||
|             f'<=c)\n' | ||||
|             f'{self}' | ||||
|         ) | ||||
| 
 | ||||
|         # TODO: what happens here when we self-cancel tho? | ||||
|         self._cancel_called_by_remote: tuple = requesting_uid | ||||
|         self._cancel_called_by: tuple = requesting_aid | ||||
|         self._cancel_called = True | ||||
| 
 | ||||
|         # cancel all ongoing rpc tasks | ||||
|  | @ -1025,7 +1089,7 @@ class Actor: | |||
| 
 | ||||
|             # self-cancel **all** ongoing RPC tasks | ||||
|             await self.cancel_rpc_tasks( | ||||
|                 req_uid=requesting_uid, | ||||
|                 req_aid=requesting_aid, | ||||
|                 parent_chan=None, | ||||
|             ) | ||||
| 
 | ||||
|  | @ -1054,8 +1118,7 @@ class Actor: | |||
|         self, | ||||
|         cid: str, | ||||
|         parent_chan: Channel, | ||||
|         requesting_uid: tuple[str, str]|None, | ||||
|         # ^^TODO! use the `Aid` directly here! | ||||
|         requesting_aid: msgtypes.Aid|None, | ||||
| 
 | ||||
|         ipc_msg: dict|None|bool = False, | ||||
| 
 | ||||
|  | @ -1093,7 +1156,7 @@ class Actor: | |||
|             log.runtime( | ||||
|                 'Cancel request for invalid RPC task.\n' | ||||
|                 'The task likely already completed or was never started!\n\n' | ||||
|                 f'<= canceller: {requesting_uid}\n' | ||||
|                 f'<= canceller: {requesting_aid}\n' | ||||
|                 f'=> {cid}@{parent_chan.uid}\n' | ||||
|                 f'  |_{parent_chan}\n' | ||||
|             ) | ||||
|  | @ -1101,7 +1164,7 @@ class Actor: | |||
| 
 | ||||
|         log.cancel( | ||||
|             'Rxed cancel request for RPC task\n' | ||||
|             f'{ctx._task!r} <=c) {requesting_uid}\n' | ||||
|             f'{ctx._task!r} <=c) {requesting_aid}\n' | ||||
|             f'|_>> {ctx.repr_rpc}\n' | ||||
| 
 | ||||
|             # f'|_{ctx._task}\n' | ||||
|  | @ -1127,9 +1190,9 @@ class Actor: | |||
|         ) | ||||
|         if ( | ||||
|             ctx._canceller is None | ||||
|             and requesting_uid | ||||
|             and requesting_aid | ||||
|         ): | ||||
|             ctx._canceller: tuple = requesting_uid | ||||
|             ctx._canceller: tuple = requesting_aid.uid | ||||
| 
 | ||||
|         # TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and | ||||
|         # then raise and pack it here? | ||||
|  | @ -1155,7 +1218,7 @@ class Actor: | |||
| 
 | ||||
|         # wait for _invoke to mark the task complete | ||||
|         flow_info: str = ( | ||||
|             f'<= canceller: {requesting_uid}\n' | ||||
|             f'<= canceller: {requesting_aid}\n' | ||||
|             f'=> ipc-parent: {parent_chan}\n' | ||||
|             f'|_{ctx}\n' | ||||
|         ) | ||||
|  | @ -1172,7 +1235,7 @@ class Actor: | |||
| 
 | ||||
|     async def cancel_rpc_tasks( | ||||
|         self, | ||||
|         req_uid: tuple[str, str], | ||||
|         req_aid: msgtypes.Aid, | ||||
| 
 | ||||
|         # NOTE: when None is passed we cancel **all** rpc | ||||
|         # tasks running in this actor! | ||||
|  | @ -1189,7 +1252,7 @@ class Actor: | |||
|         if not tasks: | ||||
|             log.runtime( | ||||
|                 'Actor has no cancellable RPC tasks?\n' | ||||
|                 f'<= canceller: {req_uid}\n' | ||||
|                 f'<= canceller: {req_aid.reprol()}\n' | ||||
|             ) | ||||
|             return | ||||
| 
 | ||||
|  | @ -1229,7 +1292,7 @@ class Actor: | |||
|         ) | ||||
|         log.cancel( | ||||
|             f'Cancelling {descr} RPC tasks\n\n' | ||||
|             f'<=c) {req_uid} [canceller]\n' | ||||
|             f'<=c) {req_aid} [canceller]\n' | ||||
|             f'{rent_chan_repr}' | ||||
|             f'c)=> {self.uid} [cancellee]\n' | ||||
|             f'  |_{self} [with {len(tasks)} tasks]\n' | ||||
|  | @ -1257,7 +1320,7 @@ class Actor: | |||
|             await self._cancel_task( | ||||
|                 cid, | ||||
|                 task_caller_chan, | ||||
|                 requesting_uid=req_uid, | ||||
|                 requesting_aid=req_aid, | ||||
|             ) | ||||
| 
 | ||||
|         if tasks: | ||||
|  | @ -1554,8 +1617,9 @@ async def async_main( | |||
|                     # 'Blocking on service nursery to exit..\n' | ||||
|                 ) | ||||
|             log.runtime( | ||||
|                 "Service nursery complete\n" | ||||
|                 "Waiting on root nursery to complete" | ||||
|                 'Service nursery complete\n' | ||||
|                 '\n' | ||||
|                 '-> Waiting on root nursery to complete' | ||||
|             ) | ||||
| 
 | ||||
|         # Blocks here as expected until the root nursery is | ||||
|  | @ -1705,7 +1769,7 @@ async def async_main( | |||
|     ) | ||||
|     teardown_report += ( | ||||
|         'Actor runtime exited\n' | ||||
|         f'{op_nested_actor_repr}\n' | ||||
|         f'{op_nested_actor_repr}' | ||||
|     ) | ||||
|     log.info(teardown_report) | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue