Refine `Actor` status iface, use `Aid` throughout

To simplify `.pformat()` output when the new `privates: bool` is unset
(the default) this adds new public attrs to wrap an actor's
cancellation status as well as provide a `.repr_state: str` (similar to
our equiv on `Context`). Rework `.pformat()` to render a much simplified
repr using all these new refinements.

Further, port the `.cancel()` method to use `.msg.types.Aid` for all
internal `requesting_uid` refs (now renamed with `_aid`) and in all
called downstream methods.

New cancel-state iface deats,
- rename `._cancel_called_by_remote` -> `._cancel_called_by` and expect
  it to be set as an `Aid`.
- add `.cancel_complete: bool` which flags whether `.cancel()` ran to
  completion.
- add `.cancel_called: bool` which just wraps `._cancel_called` (and
  which likely will just be dropped since we already have
  `._cancel_called_by`).
- add `.cancel_caller: Aid|None` which wraps `._cancel_called_by`.

In terms of using `Aid` in cancel methods,
- rename vars with `_aid` suffix in `.cancel()` (and wherever else).
- change `.cancel_rpc_tasks()` input param to `req_aid: msgtypes.Aid`.
- do the same for `._cancel_task()` and (for now until we adjust its
  internals as well) use the `Aid.uid` remap property when assigning
  `Context._canceller`.
- adjust all log msg refs to match obvi.
enable_tpts
Tyler Goodlet 2025-06-23 17:33:54 -04:00
parent 25f3cf795d
commit 49c61e40c7
1 changed files with 145 additions and 81 deletions

View File

@ -234,7 +234,7 @@ class Actor:
# state # state
self._cancel_complete = trio.Event() self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple]|None = None self._cancel_called_by: tuple[str, tuple]|None = None
self._cancel_called: bool = False self._cancel_called: bool = False
# retreive and store parent `__main__` data which # retreive and store parent `__main__` data which
@ -346,69 +346,118 @@ class Actor:
def pid(self) -> int: def pid(self) -> int:
return self._aid.pid return self._aid.pid
@property
def repr_state(self) -> str:
if self.cancel_complete:
return 'cancelled'
elif canceller := self.cancel_caller:
return f' and cancel-called by {canceller}'
else:
return 'running'
def pformat( def pformat(
self, self,
ds: str = ': ', ds: str = ': ',
indent: int = 0, indent: int = 0,
privates: bool = False,
) -> str: ) -> str:
fields_sect_prefix: str = ' |_'
parent_uid: tuple|None = None fmtstr: str = f'|_id: {self.aid.reprol()!r}\n'
if privates:
aid_nest_prefix: str = '|_aid='
aid_field_repr: str = _pformat.nest_from_op(
input_op='',
text=pretty_struct.pformat(
struct=self.aid,
field_indent=2,
),
op_suffix='',
nest_prefix=aid_nest_prefix,
nest_indent=0,
)
fmtstr: str = f'{aid_field_repr}'
if rent_chan := self._parent_chan: if rent_chan := self._parent_chan:
parent_uid = rent_chan.uid fmtstr += (
f"|_parent{ds}{rent_chan.aid.reprol()}\n"
)
peers: list = []
server: _server.IPCServer = self.ipc_server server: _server.IPCServer = self.ipc_server
ipc_server_sect: str = ''
if server: if server:
peers: list[tuple] = list(server._peer_connected) if privates:
server_repr: str = self._ipc_server.pformat(
privates=privates,
)
# create field ln as a key-header indented under
# and up to the section's key prefix.
# ^XXX if we were to indent `repr(Server)` to
# '<key>: '
# _here_^
server_repr: str = _pformat.nest_from_op(
input_op='', # nest as sub-obj
op_suffix='',
text=server_repr,
)
fmtstr += (
f"{server_repr}"
)
else:
fmtstr += (
f'|_ipc: {server.repr_state!r}\n'
)
# create field ln as a key-header indented under fmtstr += (
# and up to the section's key prefix. f'|_rpc: {len(self._rpc_tasks)} active tasks\n'
# field_ln_header: str = textwrap.indent(
# text=f"ipc_server{ds}",
# prefix=' '*len(fields_sect_prefix),
# )
# ^XXX if we were to indent `repr(Server)` to
# '<key>: '
# _here_^
server_repr: str = textwrap.indent(
text=self._ipc_server.pformat(),
# prefix=' '*len(field_ln_header),
prefix=' '*len(fields_sect_prefix),
)
ipc_server_sect: str = (
# f'{field_ln_header}\n'
f'{server_repr}'
)
fmtstr: str = (
f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n"
f" parent{ds}{parent_uid}\n"
# f'\n'
f' |_ipc: {len(peers)!r} connected peers\n'
f" peers{ds}{peers!r}\n"
f"{ipc_server_sect}"
# f'\n'
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
f" ctxs{ds}{len(self._contexts)}\n"
# f'\n'
f' |_runtime: ._task{ds}{self._task!r}\n'
f' _spawn_method{ds}{self._spawn_method}\n'
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' _forkserver_info{ds}{self._forkserver_info}\n'
# f'\n'
f' |_state: "TODO: .repr_state()"\n'
f' _cancel_complete{ds}{self._cancel_complete}\n'
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
f' _cancel_called{ds}{self._cancel_called}\n'
) )
# TODO, actually fix the .repr_state impl/output?
# append ipc-ctx state summary
# ctxs: dict = self._contexts
# if ctxs:
# ctx_states: dict[str, int] = {}
# for ctx in self._contexts.values():
# ctx_state: str = ctx.repr_state
# cnt = ctx_states.setdefault(ctx_state, 0)
# ctx_states[ctx_state] = cnt + 1
# fmtstr += (
# f" ctxs{ds}{ctx_states}\n"
# )
# runtime-state
task_name: str = '<dne>'
if task := self._task:
task_name: str = task.name
fmtstr += (
# TODO, this just like ctx?
f'|_state: {self.repr_state!r}\n'
f' task: {task_name}\n'
f' loglevel: {self.loglevel!r}\n'
f' subactors_spawned: {len(self._actoruid2nursery)}\n'
)
if not _state.is_root_process():
fmtstr += f' spawn_method: {self._spawn_method!r}\n'
if privates:
fmtstr += (
# f' actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' cancel_complete{ds}{self._cancel_complete}\n'
f' cancel_called_by_remote{ds}{self._cancel_called_by}\n'
f' cancel_called{ds}{self._cancel_called}\n'
)
if fmtstr:
fmtstr: str = textwrap.indent(
text=fmtstr,
prefix=' '*(1 + indent),
)
_repr: str = ( _repr: str = (
'<Actor(\n' f'<{type(self).__name__}(\n'
+ f'{fmtstr}'
fmtstr f')>\n'
+
')>\n'
) )
if indent: if indent:
_repr: str = textwrap.indent( _repr: str = textwrap.indent(
@ -533,11 +582,11 @@ class Actor:
queue. queue.
''' '''
uid: tuple[str, str] = chan.uid aid: msgtypes.Aid = chan.aid
assert uid, f"`chan.uid` can't be {uid}" assert aid, f"`chan.aid` can't be {aid}"
try: try:
ctx: Context = self._contexts[( ctx: Context = self._contexts[(
uid, aid.uid,
cid, cid,
# TODO: how to determine this tho? # TODO: how to determine this tho?
@ -548,7 +597,7 @@ class Actor:
'Ignoring invalid IPC msg!?\n' 'Ignoring invalid IPC msg!?\n'
f'Ctx seems to not/no-longer exist??\n' f'Ctx seems to not/no-longer exist??\n'
f'\n' f'\n'
f'<=? {uid}\n' f'<=? {aid.reprol()!r}\n'
f' |_{pretty_struct.pformat(msg)}\n' f' |_{pretty_struct.pformat(msg)}\n'
) )
match msg: match msg:
@ -597,6 +646,7 @@ class Actor:
msging session's lifetime. msging session's lifetime.
''' '''
# ?TODO, use Aid here as well?
actor_uid = chan.uid actor_uid = chan.uid
assert actor_uid assert actor_uid
try: try:
@ -945,6 +995,22 @@ class Actor:
None, # self cancel all rpc tasks None, # self cancel all rpc tasks
) )
@property
def cancel_complete(self) -> bool:
return self._cancel_complete.is_set()
@property
def cancel_called(self) -> bool:
'''
Was this actor requested to cancel by a remote peer actor.
'''
return self._cancel_called_by is not None
@property
def cancel_caller(self) -> msgtypes.Aid|None:
return self._cancel_called_by
async def cancel( async def cancel(
self, self,
@ -969,20 +1035,18 @@ class Actor:
''' '''
( (
requesting_uid, requesting_aid, # Aid
requester_type, requester_type, # str
req_chan, req_chan,
log_meth, log_meth,
) = ( ) = (
req_chan.uid, req_chan.aid,
'peer', 'peer',
req_chan, req_chan,
log.cancel, log.cancel,
) if req_chan else ( ) if req_chan else (
# a self cancel of ALL rpc tasks # a self cancel of ALL rpc tasks
self.uid, self.aid,
'self', 'self',
self, self,
log.runtime, log.runtime,
@ -990,14 +1054,14 @@ class Actor:
# TODO: just use the new `Context.repr_rpc: str` (and # TODO: just use the new `Context.repr_rpc: str` (and
# other) repr fields instead of doing this all manual.. # other) repr fields instead of doing this all manual..
msg: str = ( msg: str = (
f'Actor-runtime cancel request from {requester_type}\n\n' f'Actor-runtime cancel request from {requester_type!r}\n'
f'<=c) {requesting_uid}\n'
f' |_{self}\n'
f'\n' f'\n'
f'<=c)\n'
f'{self}'
) )
# TODO: what happens here when we self-cancel tho? # TODO: what happens here when we self-cancel tho?
self._cancel_called_by_remote: tuple = requesting_uid self._cancel_called_by: tuple = requesting_aid
self._cancel_called = True self._cancel_called = True
# cancel all ongoing rpc tasks # cancel all ongoing rpc tasks
@ -1025,7 +1089,7 @@ class Actor:
# self-cancel **all** ongoing RPC tasks # self-cancel **all** ongoing RPC tasks
await self.cancel_rpc_tasks( await self.cancel_rpc_tasks(
req_uid=requesting_uid, req_aid=requesting_aid,
parent_chan=None, parent_chan=None,
) )
@ -1054,8 +1118,7 @@ class Actor:
self, self,
cid: str, cid: str,
parent_chan: Channel, parent_chan: Channel,
requesting_uid: tuple[str, str]|None, requesting_aid: msgtypes.Aid|None,
# ^^TODO! use the `Aid` directly here!
ipc_msg: dict|None|bool = False, ipc_msg: dict|None|bool = False,
@ -1093,7 +1156,7 @@ class Actor:
log.runtime( log.runtime(
'Cancel request for invalid RPC task.\n' 'Cancel request for invalid RPC task.\n'
'The task likely already completed or was never started!\n\n' 'The task likely already completed or was never started!\n\n'
f'<= canceller: {requesting_uid}\n' f'<= canceller: {requesting_aid}\n'
f'=> {cid}@{parent_chan.uid}\n' f'=> {cid}@{parent_chan.uid}\n'
f' |_{parent_chan}\n' f' |_{parent_chan}\n'
) )
@ -1101,7 +1164,7 @@ class Actor:
log.cancel( log.cancel(
'Rxed cancel request for RPC task\n' 'Rxed cancel request for RPC task\n'
f'{ctx._task!r} <=c) {requesting_uid}\n' f'{ctx._task!r} <=c) {requesting_aid}\n'
f'|_>> {ctx.repr_rpc}\n' f'|_>> {ctx.repr_rpc}\n'
# f'|_{ctx._task}\n' # f'|_{ctx._task}\n'
@ -1127,9 +1190,9 @@ class Actor:
) )
if ( if (
ctx._canceller is None ctx._canceller is None
and requesting_uid and requesting_aid
): ):
ctx._canceller: tuple = requesting_uid ctx._canceller: tuple = requesting_aid.uid
# TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and # TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and
# then raise and pack it here? # then raise and pack it here?
@ -1155,7 +1218,7 @@ class Actor:
# wait for _invoke to mark the task complete # wait for _invoke to mark the task complete
flow_info: str = ( flow_info: str = (
f'<= canceller: {requesting_uid}\n' f'<= canceller: {requesting_aid}\n'
f'=> ipc-parent: {parent_chan}\n' f'=> ipc-parent: {parent_chan}\n'
f'|_{ctx}\n' f'|_{ctx}\n'
) )
@ -1172,7 +1235,7 @@ class Actor:
async def cancel_rpc_tasks( async def cancel_rpc_tasks(
self, self,
req_uid: tuple[str, str], req_aid: msgtypes.Aid,
# NOTE: when None is passed we cancel **all** rpc # NOTE: when None is passed we cancel **all** rpc
# tasks running in this actor! # tasks running in this actor!
@ -1189,7 +1252,7 @@ class Actor:
if not tasks: if not tasks:
log.runtime( log.runtime(
'Actor has no cancellable RPC tasks?\n' 'Actor has no cancellable RPC tasks?\n'
f'<= canceller: {req_uid}\n' f'<= canceller: {req_aid.reprol()}\n'
) )
return return
@ -1229,7 +1292,7 @@ class Actor:
) )
log.cancel( log.cancel(
f'Cancelling {descr} RPC tasks\n\n' f'Cancelling {descr} RPC tasks\n\n'
f'<=c) {req_uid} [canceller]\n' f'<=c) {req_aid} [canceller]\n'
f'{rent_chan_repr}' f'{rent_chan_repr}'
f'c)=> {self.uid} [cancellee]\n' f'c)=> {self.uid} [cancellee]\n'
f' |_{self} [with {len(tasks)} tasks]\n' f' |_{self} [with {len(tasks)} tasks]\n'
@ -1257,7 +1320,7 @@ class Actor:
await self._cancel_task( await self._cancel_task(
cid, cid,
task_caller_chan, task_caller_chan,
requesting_uid=req_uid, requesting_aid=req_aid,
) )
if tasks: if tasks:
@ -1554,8 +1617,9 @@ async def async_main(
# 'Blocking on service nursery to exit..\n' # 'Blocking on service nursery to exit..\n'
) )
log.runtime( log.runtime(
"Service nursery complete\n" 'Service nursery complete\n'
"Waiting on root nursery to complete" '\n'
'-> Waiting on root nursery to complete'
) )
# Blocks here as expected until the root nursery is # Blocks here as expected until the root nursery is
@ -1705,7 +1769,7 @@ async def async_main(
) )
teardown_report += ( teardown_report += (
'Actor runtime exited\n' 'Actor runtime exited\n'
f'{op_nested_actor_repr}\n' f'{op_nested_actor_repr}'
) )
log.info(teardown_report) log.info(teardown_report)