From b5e3fa73705434d5e62735ccdcc31073236c2a2e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 12 Jun 2025 23:26:38 -0400 Subject: [PATCH 01/25] Use `nest_from_op()` in some runtime logs for actor-state-repring --- tractor/_root.py | 9 +++++-- tractor/_runtime.py | 63 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 58 insertions(+), 14 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index 82bec667..1818283a 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -47,6 +47,7 @@ from ._runtime import ( from .devx import ( debug, _frame_stack, + pformat as _pformat, ) from . import _spawn from . import _state @@ -518,10 +519,14 @@ async def open_root_actor( # for an in nurseries: # tempn.start_soon(an.exited.wait) + op_nested_actor_repr: str = _pformat.nest_from_op( + input_op='>) ', + tree_str=actor.pformat(), + nest_prefix='|_', + ) logger.info( f'Closing down root actor\n' - f'>)\n' - f'|_{actor}\n' + f'{op_nested_actor_repr}\n' ) await actor.cancel(None) # self cancel finally: diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 758e5685..e7475662 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -55,6 +55,7 @@ from typing import ( TYPE_CHECKING, ) import uuid +import textwrap from types import ModuleType import warnings @@ -97,7 +98,10 @@ from ._exceptions import ( MsgTypeError, unpack_error, ) -from .devx import debug +from .devx import ( + debug, + pformat as _pformat +) from ._discovery import get_registry from ._portal import Portal from . import _state @@ -339,46 +343,76 @@ class Actor: def pid(self) -> int: return self._aid.pid - def pformat(self) -> str: - ds: str = '=' + def pformat( + self, + ds: str = ':', + indent: int = 0, + ) -> str: + fields_sect_prefix: str = ' |_' parent_uid: tuple|None = None if rent_chan := self._parent_chan: parent_uid = rent_chan.uid peers: list = [] server: _server.IPCServer = self.ipc_server + ipc_server_sect: str = '' if server: peers: list[tuple] = list(server._peer_connected) + # create field ln as a key-header indented under + # and up to the section's key prefix. + # field_ln_header: str = textwrap.indent( + # text=f"ipc_server{ds}", + # prefix=' '*len(fields_sect_prefix), + # ) + # ^XXX if we were to indent `repr(Server)` to + # ': ' + # _here_^ + server_repr: str = textwrap.indent( + text=self._ipc_server.pformat(), + # prefix=' '*len(field_ln_header), + prefix=' '*len(fields_sect_prefix), + ) + ipc_server_sect: str = ( + # f'{field_ln_header}\n' + f'{server_repr}' + ) + fmtstr: str = ( f' |_id: {self.aid!r}\n' # f" aid{ds}{self.aid!r}\n" f" parent{ds}{parent_uid}\n" - f'\n' + # f'\n' f' |_ipc: {len(peers)!r} connected peers\n' f" peers{ds}{peers!r}\n" - f" ipc_server{ds}{self._ipc_server}\n" - f'\n' + f"{ipc_server_sect}" + # f'\n' f' |_rpc: {len(self._rpc_tasks)} tasks\n' f" ctxs{ds}{len(self._contexts)}\n" - f'\n' + # f'\n' f' |_runtime: ._task{ds}{self._task!r}\n' f' _spawn_method{ds}{self._spawn_method}\n' f' _actoruid2nursery{ds}{self._actoruid2nursery}\n' f' _forkserver_info{ds}{self._forkserver_info}\n' - f'\n' + # f'\n' f' |_state: "TODO: .repr_state()"\n' f' _cancel_complete{ds}{self._cancel_complete}\n' f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n' f' _cancel_called{ds}{self._cancel_called}\n' ) - return ( + _repr: str = ( '\n' ) + if indent: + _repr: str = textwrap.indent( + text=_repr, + prefix=' '*indent, + ) + return _repr __repr__ = pformat @@ -1654,10 +1688,15 @@ async def async_main( '-> All peer channels are complete\n' ) + op_nested_actor_repr: str = _pformat.nest_from_op( + input_op=')> ', + tree_str=actor.pformat(), + nest_prefix='|_', + back_from_op=2, + ) teardown_report += ( - 'Actor runtime exiting\n' - f'>)\n' - f'|_{actor}\n' + 'Actor runtime exited\n' + f'{op_nested_actor_repr}\n' ) log.info(teardown_report) From 547cf5a21017200f675bf7a2d747f861141f6ae1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 15 Jun 2025 22:04:01 -0400 Subject: [PATCH 02/25] Drop stale comment from inter-peer suite --- tests/test_inter_peer_cancellation.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 25935df2..b6d469d9 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -410,7 +410,6 @@ def test_peer_canceller( ''' async def main(): async with tractor.open_nursery( - # NOTE: to halt the peer tasks on ctxc, uncomment this. debug_mode=debug_mode, ) as an: canceller: Portal = await an.start_actor( From e9f2fecd6613fb6830b5e4257a1690223d431fdc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jul 2025 17:51:45 -0400 Subject: [PATCH 03/25] Fix `nest_from_op()` call sigs, already changed upstream In `._runtime/_root` and since the latest fn-signature changes were already landed onto main branch via the 65b7956: #384-patch. --- tractor/_root.py | 2 +- tractor/_runtime.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index 1818283a..88347132 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -521,7 +521,7 @@ async def open_root_actor( op_nested_actor_repr: str = _pformat.nest_from_op( input_op='>) ', - tree_str=actor.pformat(), + text=actor.pformat(), nest_prefix='|_', ) logger.info( diff --git a/tractor/_runtime.py b/tractor/_runtime.py index e7475662..1c032eb5 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -1690,9 +1690,9 @@ async def async_main( op_nested_actor_repr: str = _pformat.nest_from_op( input_op=')> ', - tree_str=actor.pformat(), + text=actor.pformat(), nest_prefix='|_', - back_from_op=2, + nest_indent=2, ) teardown_report += ( 'Actor runtime exited\n' From a8428d7de3c442ada405a5c8ea4b1ce987e18b36 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 22 Jun 2025 21:55:37 -0400 Subject: [PATCH 04/25] Extend `.msg.types.Aid` method interface Providing the legacy `.uid -> tuple` style id (since still used for the `Actor._contexts` table) and a `repr-one-line` method `.reprol() -> str` for rendering a compact unique actor ID summary (useful in logging/.pformat()s at the least). --- tractor/msg/types.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tractor/msg/types.py b/tractor/msg/types.py index aaf8d137..f077f132 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -154,6 +154,29 @@ class Aid( # should also include at least `.pid` (equiv to port for tcp) # and/or host-part always? + @property + def uid(self) -> tuple[str, str]: + ''' + Legacy actor "unique-id" pair format. + + ''' + return ( + self.name, + self.uuid, + ) + + def reprol( + self, + sin_uuid: bool = True, + ) -> str: + if not sin_uuid: + return ( + f'{self.name}[{self.uuid[:6]}]@{self.pid!r}' + ) + return ( + f'{self.name}@{self.pid!r}' + ) + class SpawnSpec( pretty_struct.Struct, From 679d9991858497d006e4a660680e07beb9722f8f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 23 Jun 2025 12:08:05 -0400 Subject: [PATCH 05/25] Add flag to toggle private vars in `Channel.pformat()` Call it `privates: bool` and only show certain internal instance vars when set in the `repr()` output. --- tractor/ipc/_chan.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 2c3374c2..dd7b520f 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -196,9 +196,12 @@ class Channel: self._transport.codec = orig # TODO: do a .src/.dst: str for maddrs? - def pformat(self) -> str: + def pformat( + self, + privates: bool = False, + ) -> str: if not self._transport: - return '' + return '' tpt: MsgTransport = self._transport tpt_name: str = type(tpt).__name__ @@ -206,14 +209,17 @@ class Channel: 'connected' if self.connected() else 'closed' ) - return ( + repr_str: str = ( f'' + ) + ( f' |_msgstream: {tpt_name}\n' f' proto={tpt.laddr.proto_key!r}\n' f' layer={tpt.layer_key!r}\n' @@ -223,9 +229,13 @@ class Channel: f' stream={tpt.stream}\n' f' maddr={tpt.maddr!r}\n' f' drained={tpt.drained}\n' + ) + ( f' _send_lock={tpt._send_lock.statistics()}\n' - f')>\n' + if privates else '' + ) + ( + ')>\n' ) + return repr_str # NOTE: making this return a value that can be passed to # `eval()` is entirely **optional** FYI! From 808a33650844e85261687f73c481869cf1a8da46 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 23 Jun 2025 17:33:54 -0400 Subject: [PATCH 06/25] Refine `Actor` status iface, use `Aid` throughout To simplify `.pformat()` output when the new `privates: bool` is unset (the default) this adds new public attrs to wrap an actor's cancellation status as well as provide a `.repr_state: str` (similar to our equiv on `Context`). Rework `.pformat()` to render a much simplified repr using all these new refinements. Further, port the `.cancel()` method to use `.msg.types.Aid` for all internal `requesting_uid` refs (now renamed with `_aid`) and in all called downstream methods. New cancel-state iface deats, - rename `._cancel_called_by_remote` -> `._cancel_called_by` and expect it to be set as an `Aid`. - add `.cancel_complete: bool` which flags whether `.cancel()` ran to completion. - add `.cancel_called: bool` which just wraps `._cancel_called` (and which likely will just be dropped since we already have `._cancel_called_by`). - add `.cancel_caller: Aid|None` which wraps `._cancel_called_by`. In terms of using `Aid` in cancel methods, - rename vars with `_aid` suffix in `.cancel()` (and wherever else). - change `.cancel_rpc_tasks()` input param to `req_aid: msgtypes.Aid`. - do the same for `._cancel_task()` and (for now until we adjust its internals as well) use the `Aid.uid` remap property when assigning `Context._canceller`. - adjust all log msg refs to match obvi. --- tractor/_runtime.py | 232 ++++++++++++++++++++++++++++---------------- 1 file changed, 150 insertions(+), 82 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 1c032eb5..95348e86 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -231,7 +231,7 @@ class Actor: # state self._cancel_complete = trio.Event() - self._cancel_called_by_remote: tuple[str, tuple]|None = None + self._cancel_called_by: tuple[str, tuple]|None = None self._cancel_called: bool = False # retreive and store parent `__main__` data which @@ -343,69 +343,118 @@ class Actor: def pid(self) -> int: return self._aid.pid + @property + def repr_state(self) -> str: + if self.cancel_complete: + return 'cancelled' + + elif canceller := self.cancel_caller: + return f' and cancel-called by {canceller}' + + else: + return 'running' + def pformat( self, ds: str = ':', indent: int = 0, + privates: bool = False, ) -> str: - fields_sect_prefix: str = ' |_' - parent_uid: tuple|None = None + + fmtstr: str = f'|_id: {self.aid.reprol()!r}\n' + if privates: + aid_nest_prefix: str = '|_aid=' + aid_field_repr: str = _pformat.nest_from_op( + input_op='', + text=pretty_struct.pformat( + struct=self.aid, + field_indent=2, + ), + op_suffix='', + nest_prefix=aid_nest_prefix, + nest_indent=0, + ) + fmtstr: str = f'{aid_field_repr}' + if rent_chan := self._parent_chan: - parent_uid = rent_chan.uid + fmtstr += ( + f"|_parent{ds}{rent_chan.aid.reprol()}\n" + ) - peers: list = [] server: _server.IPCServer = self.ipc_server - ipc_server_sect: str = '' if server: - peers: list[tuple] = list(server._peer_connected) + if privates: + server_repr: str = self._ipc_server.pformat( + privates=privates, + ) + # create field ln as a key-header indented under + # and up to the section's key prefix. + # ^XXX if we were to indent `repr(Server)` to + # ': ' + # _here_^ + server_repr: str = _pformat.nest_from_op( + input_op='', # nest as sub-obj + op_suffix='', + text=server_repr, + ) + fmtstr += ( + f"{server_repr}" + ) + else: + fmtstr += ( + f'|_ipc: {server.repr_state!r}\n' + ) - # create field ln as a key-header indented under - # and up to the section's key prefix. - # field_ln_header: str = textwrap.indent( - # text=f"ipc_server{ds}", - # prefix=' '*len(fields_sect_prefix), - # ) - # ^XXX if we were to indent `repr(Server)` to - # ': ' - # _here_^ - server_repr: str = textwrap.indent( - text=self._ipc_server.pformat(), - # prefix=' '*len(field_ln_header), - prefix=' '*len(fields_sect_prefix), - ) - ipc_server_sect: str = ( - # f'{field_ln_header}\n' - f'{server_repr}' - ) - - fmtstr: str = ( - f' |_id: {self.aid!r}\n' - # f" aid{ds}{self.aid!r}\n" - f" parent{ds}{parent_uid}\n" - # f'\n' - f' |_ipc: {len(peers)!r} connected peers\n' - f" peers{ds}{peers!r}\n" - f"{ipc_server_sect}" - # f'\n' - f' |_rpc: {len(self._rpc_tasks)} tasks\n' - f" ctxs{ds}{len(self._contexts)}\n" - # f'\n' - f' |_runtime: ._task{ds}{self._task!r}\n' - f' _spawn_method{ds}{self._spawn_method}\n' - f' _actoruid2nursery{ds}{self._actoruid2nursery}\n' - f' _forkserver_info{ds}{self._forkserver_info}\n' - # f'\n' - f' |_state: "TODO: .repr_state()"\n' - f' _cancel_complete{ds}{self._cancel_complete}\n' - f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n' - f' _cancel_called{ds}{self._cancel_called}\n' + fmtstr += ( + f'|_rpc: {len(self._rpc_tasks)} active tasks\n' ) + + # TODO, actually fix the .repr_state impl/output? + # append ipc-ctx state summary + # ctxs: dict = self._contexts + # if ctxs: + # ctx_states: dict[str, int] = {} + # for ctx in self._contexts.values(): + # ctx_state: str = ctx.repr_state + # cnt = ctx_states.setdefault(ctx_state, 0) + # ctx_states[ctx_state] = cnt + 1 + + # fmtstr += ( + # f" ctxs{ds}{ctx_states}\n" + # ) + + # runtime-state + task_name: str = '' + if task := self._task: + task_name: str = task.name + fmtstr += ( + # TODO, this just like ctx? + f'|_state: {self.repr_state!r}\n' + f' task: {task_name}\n' + f' loglevel: {self.loglevel!r}\n' + f' subactors_spawned: {len(self._actoruid2nursery)}\n' + ) + if not _state.is_root_process(): + fmtstr += f' spawn_method: {self._spawn_method!r}\n' + + if privates: + fmtstr += ( + # f' actoruid2nursery{ds}{self._actoruid2nursery}\n' + f' cancel_complete{ds}{self._cancel_complete}\n' + f' cancel_called_by_remote{ds}{self._cancel_called_by}\n' + f' cancel_called{ds}{self._cancel_called}\n' + ) + + if fmtstr: + fmtstr: str = textwrap.indent( + text=fmtstr, + prefix=' '*(1 + indent), + ) + _repr: str = ( - '\n' + f'<{type(self).__name__}(\n' + f'{fmtstr}' + f')>\n' ) if indent: _repr: str = textwrap.indent( @@ -530,11 +579,11 @@ class Actor: queue. ''' - uid: tuple[str, str] = chan.uid - assert uid, f"`chan.uid` can't be {uid}" + aid: msgtypes.Aid = chan.aid + assert aid, f"`chan.aid` can't be {aid}" try: ctx: Context = self._contexts[( - uid, + aid.uid, cid, # TODO: how to determine this tho? @@ -545,7 +594,7 @@ class Actor: 'Ignoring invalid IPC msg!?\n' f'Ctx seems to not/no-longer exist??\n' f'\n' - f'<=? {uid}\n' + f'<=? {aid.reprol()!r}\n' f' |_{pretty_struct.pformat(msg)}\n' ) match msg: @@ -594,6 +643,7 @@ class Actor: msging session's lifetime. ''' + # ?TODO, use Aid here as well? actor_uid = chan.uid assert actor_uid try: @@ -942,6 +992,22 @@ class Actor: None, # self cancel all rpc tasks ) + @property + def cancel_complete(self) -> bool: + return self._cancel_complete.is_set() + + @property + def cancel_called(self) -> bool: + ''' + Was this actor requested to cancel by a remote peer actor. + + ''' + return self._cancel_called_by is not None + + @property + def cancel_caller(self) -> msgtypes.Aid|None: + return self._cancel_called_by + async def cancel( self, @@ -966,20 +1032,18 @@ class Actor: ''' ( - requesting_uid, - requester_type, + requesting_aid, # Aid + requester_type, # str req_chan, log_meth, ) = ( - req_chan.uid, + req_chan.aid, 'peer', req_chan, log.cancel, - ) if req_chan else ( - # a self cancel of ALL rpc tasks - self.uid, + self.aid, 'self', self, log.runtime, @@ -987,14 +1051,14 @@ class Actor: # TODO: just use the new `Context.repr_rpc: str` (and # other) repr fields instead of doing this all manual.. msg: str = ( - f'Actor-runtime cancel request from {requester_type}\n\n' - f'<=c) {requesting_uid}\n' - f' |_{self}\n' + f'Actor-runtime cancel request from {requester_type!r}\n' f'\n' + f'<=c)\n' + f'{self}' ) # TODO: what happens here when we self-cancel tho? - self._cancel_called_by_remote: tuple = requesting_uid + self._cancel_called_by: tuple = requesting_aid self._cancel_called = True # cancel all ongoing rpc tasks @@ -1022,7 +1086,7 @@ class Actor: # self-cancel **all** ongoing RPC tasks await self.cancel_rpc_tasks( - req_uid=requesting_uid, + req_aid=requesting_aid, parent_chan=None, ) @@ -1051,7 +1115,7 @@ class Actor: self, cid: str, parent_chan: Channel, - requesting_uid: tuple[str, str]|None, + requesting_aid: msgtypes.Aid|None, ipc_msg: dict|None|bool = False, @@ -1089,7 +1153,7 @@ class Actor: log.runtime( 'Cancel request for invalid RPC task.\n' 'The task likely already completed or was never started!\n\n' - f'<= canceller: {requesting_uid}\n' + f'<= canceller: {requesting_aid}\n' f'=> {cid}@{parent_chan.uid}\n' f' |_{parent_chan}\n' ) @@ -1097,9 +1161,12 @@ class Actor: log.cancel( 'Rxed cancel request for RPC task\n' - f'<=c) {requesting_uid}\n' - f' |_{ctx._task}\n' - f' >> {ctx.repr_rpc}\n' + f'{ctx._task!r} <=c) {requesting_aid}\n' + f'|_>> {ctx.repr_rpc}\n' + + # f'|_{ctx._task}\n' + # f' >> {ctx.repr_rpc}\n' + # f'=> {ctx._task}\n' # f' >> Actor._cancel_task() => {ctx._task}\n' # f' |_ {ctx._task}\n\n' @@ -1120,9 +1187,9 @@ class Actor: ) if ( ctx._canceller is None - and requesting_uid + and requesting_aid ): - ctx._canceller: tuple = requesting_uid + ctx._canceller: tuple = requesting_aid.uid # TODO: pack the RPC `{'cmd': }` msg into a ctxc and # then raise and pack it here? @@ -1148,7 +1215,7 @@ class Actor: # wait for _invoke to mark the task complete flow_info: str = ( - f'<= canceller: {requesting_uid}\n' + f'<= canceller: {requesting_aid}\n' f'=> ipc-parent: {parent_chan}\n' f'|_{ctx}\n' ) @@ -1165,7 +1232,7 @@ class Actor: async def cancel_rpc_tasks( self, - req_uid: tuple[str, str], + req_aid: msgtypes.Aid, # NOTE: when None is passed we cancel **all** rpc # tasks running in this actor! @@ -1182,7 +1249,7 @@ class Actor: if not tasks: log.runtime( 'Actor has no cancellable RPC tasks?\n' - f'<= canceller: {req_uid}\n' + f'<= canceller: {req_aid.reprol()}\n' ) return @@ -1222,7 +1289,7 @@ class Actor: ) log.cancel( f'Cancelling {descr} RPC tasks\n\n' - f'<=c) {req_uid} [canceller]\n' + f'<=c) {req_aid} [canceller]\n' f'{rent_chan_repr}' f'c)=> {self.uid} [cancellee]\n' f' |_{self} [with {len(tasks)} tasks]\n' @@ -1250,7 +1317,7 @@ class Actor: await self._cancel_task( cid, task_caller_chan, - requesting_uid=req_uid, + requesting_aid=req_aid, ) if tasks: @@ -1545,8 +1612,9 @@ async def async_main( # 'Blocking on service nursery to exit..\n' ) log.runtime( - "Service nursery complete\n" - "Waiting on root nursery to complete" + 'Service nursery complete\n' + '\n' + '-> Waiting on root nursery to complete' ) # Blocks here as expected until the root nursery is @@ -1696,7 +1764,7 @@ async def async_main( ) teardown_report += ( 'Actor runtime exited\n' - f'{op_nested_actor_repr}\n' + f'{op_nested_actor_repr}' ) log.info(teardown_report) From c00b3c86ea4ce625bc6ffdde774f547afac6db19 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jun 2025 13:41:10 -0400 Subject: [PATCH 07/25] Hide more `Channel._transport` privates for repr Such as the `MsgTransport.stream` and `.drain` attrs since they're rarely that important at the chan level. Also start adopting a `.=` style for actual attrs of the type versus a `: ` style for meta-field info lines. --- tractor/ipc/_chan.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index dd7b520f..478b4024 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -221,14 +221,16 @@ class Channel: if self.aid else '' ) + ( f' |_msgstream: {tpt_name}\n' - f' proto={tpt.laddr.proto_key!r}\n' - f' layer={tpt.layer_key!r}\n' - f' laddr={tpt.laddr}\n' - f' raddr={tpt.raddr}\n' - f' codec={tpt.codec_key!r}\n' - f' stream={tpt.stream}\n' - f' maddr={tpt.maddr!r}\n' - f' drained={tpt.drained}\n' + f' maddr: {tpt.maddr!r}\n' + f' proto: {tpt.laddr.proto_key!r}\n' + f' layer: {tpt.layer_key!r}\n' + f' codec: {tpt.codec_key!r}\n' + f' .laddr={tpt.laddr}\n' + f' .raddr={tpt.raddr}\n' + ) + ( + f' ._transport.stream={tpt.stream}\n' + f' ._transport.drained={tpt.drained}\n' + if privates else '' ) + ( f' _send_lock={tpt._send_lock.statistics()}\n' if privates else '' @@ -444,8 +446,8 @@ class Channel: await self.send(aid) peer_aid: Aid = await self.recv() log.runtime( - f'Received hanshake with peer actor,\n' - f'{peer_aid}\n' + f'Received hanshake with peer ' + f'{peer_aid.reprol(sin_uuid=False)}\n' ) # NOTE, we always are referencing the remote peer! self.aid = peer_aid From 9260909fe1babb394c6c6c84e2eba2e50bcaa017 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jun 2025 14:47:03 -0400 Subject: [PATCH 08/25] Try `nest_from_op()` in some `._rpc` spots To start trying out, - using in the `Start`-msg handler-block to repr the msg coming *from* a `repr(Channel)` using '<=)` sclang op. - for a completed RPC task in `_invoke_non_context()`. - for the msg loop task's termination report. --- tractor/_rpc.py | 101 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 71 insertions(+), 30 deletions(-) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 2535dcf0..847a81d0 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -64,6 +64,7 @@ from .trionics import ( from .devx import ( debug, add_div, + pformat as _pformat, ) from . import _state from .log import get_logger @@ -72,7 +73,7 @@ from .msg import ( MsgCodec, PayloadT, NamespacePath, - # pretty_struct, + pretty_struct, _ops as msgops, ) from tractor.msg.types import ( @@ -220,11 +221,18 @@ async def _invoke_non_context( task_status.started(ctx) result = await coro fname: str = func.__name__ + + op_nested_task: str = _pformat.nest_from_op( + input_op=f')> cid: {ctx.cid!r}', + text=f'{ctx._task}', + nest_indent=1, # under > + ) log.runtime( - 'RPC complete:\n' - f'task: {ctx._task}\n' - f'|_cid={ctx.cid}\n' - f'|_{fname}() -> {pformat(result)}\n' + f'RPC task complete\n' + f'\n' + f'{op_nested_task}\n' + f'\n' + f')> {fname}() -> {pformat(result)}\n' ) # NOTE: only send result if we know IPC isn't down @@ -1044,7 +1052,7 @@ async def process_messages( ): target_cid: str = kwargs['cid'] kwargs |= { - 'requesting_uid': chan.uid, + 'requesting_aid': chan.aid, 'ipc_msg': msg, # XXX NOTE! ONLY the rpc-task-owning @@ -1080,21 +1088,34 @@ async def process_messages( ns=ns, func=funcname, kwargs=kwargs, # type-spec this? see `msg.types` - uid=actorid, + uid=actor_uuid, ): + if actor_uuid != chan.aid.uid: + raise RuntimeError( + f'IPC msg <-> chan.aid mismatch!?\n' + f'Channel.aid = {chan.aid!r}\n' + f'Start.uid = {actor_uuid!r}\n' + ) + # await debug.pause() + op_repr: str = 'Start <=) ' + req_repr: str = _pformat.nest_from_op( + input_op=op_repr, + op_suffix='', + nest_prefix='', + text=f'{chan}', + + nest_indent=len(op_repr)-1, + rm_from_first_ln='<', + # ^XXX, subtract -1 to account for + # > {actor.uid}\n' - f' |_{actor}\n' - f' -> nsp: `{ns}.{funcname}({kwargs})`\n' - - # f' |_{ns}.{funcname}({kwargs})\n\n' - - # f'{pretty_struct.pformat(msg)}\n' + 'Handling RPC request\n' + f'{req_repr}\n' + f'\n' + f'->{{ ipc-context-id: {cid!r}\n' + f'->{{ nsp for fn: `{ns}.{funcname}({kwargs})`\n' ) # runtime-internal endpoint: `Actor.` @@ -1123,10 +1144,6 @@ async def process_messages( await chan.send(err_msg) continue - start_status += ( - f' -> func: {func}\n' - ) - # schedule a task for the requested RPC function # in the actor's main "service nursery". # @@ -1134,7 +1151,7 @@ async def process_messages( # supervision isolation? would avoid having to # manage RPC tasks individually in `._rpc_tasks` # table? - start_status += ' -> scheduling new task..\n' + start_status += '->( scheduling new task..\n' log.runtime(start_status) try: ctx: Context = await actor._service_n.start( @@ -1223,7 +1240,7 @@ async def process_messages( f'|_{chan}\n' ) await actor.cancel_rpc_tasks( - req_uid=actor.uid, + req_aid=actor.aid, # a "self cancel" in terms of the lifetime of the # IPC connection which is presumed to be the # source of any requests for spawned tasks. @@ -1295,13 +1312,37 @@ async def process_messages( finally: # msg debugging for when he machinery is brokey if msg is None: - message: str = 'Exiting IPC msg loop without receiving a msg?' + message: str = 'Exiting RPC-loop without receiving a msg?' else: + task_op_repr: str = ')>' + task: trio.Task = trio.lowlevel.current_task() + + # maybe add cancelled opt prefix + if task._cancel_status.effectively_cancelled: + task_op_repr = 'c' + task_op_repr + + task_repr: str = _pformat.nest_from_op( + input_op=task_op_repr, + text=f'{task!r}', + nest_indent=1, + ) + # chan_op_repr: str = '<=} ' + # chan_repr: str = _pformat.nest_from_op( + # input_op=chan_op_repr, + # op_suffix='', + # nest_prefix='', + # text=chan.pformat(), + # nest_indent=len(chan_op_repr)-1, + # rm_from_first_ln='<', + # ) message: str = ( - 'Exiting IPC msg loop with final msg\n\n' - f'<= peer: {chan.uid}\n' - f' |_{chan}\n\n' - # f'{pretty_struct.pformat(msg)}' + f'Exiting RPC-loop with final msg\n' + f'\n' + # f'{chan_repr}\n' + f'{task_repr}\n' + f'\n' + f'{pretty_struct.pformat(msg)}' + f'\n' ) log.runtime(message) From 38944ad1d25098446ce8dff6935a0640e094e9f6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jun 2025 14:59:50 -0400 Subject: [PATCH 09/25] Drop `actor_info: str` from `._entry` logs --- tractor/_entry.py | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/tractor/_entry.py b/tractor/_entry.py index 8fffed5f..68e72501 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -21,7 +21,7 @@ Sub-process entry points. from __future__ import annotations from functools import partial import multiprocessing as mp -import os +# import os from typing import ( Any, TYPE_CHECKING, @@ -38,6 +38,7 @@ from .devx import ( _frame_stack, pformat, ) +# from .msg import pretty_struct from .to_asyncio import run_as_asyncio_guest from ._addr import UnwrappedAddress from ._runtime import ( @@ -127,20 +128,13 @@ def _trio_main( if actor.loglevel is not None: get_console_log(actor.loglevel) - actor_info: str = ( - f'|_{actor}\n' - f' uid: {actor.uid}\n' - f' pid: {os.getpid()}\n' - f' parent_addr: {parent_addr}\n' - f' loglevel: {actor.loglevel}\n' - ) log.info( - 'Starting new `trio` subactor\n' + f'Starting `trio` subactor from parent @ ' + f'{parent_addr}\n' + pformat.nest_from_op( input_op='>(', # see syntax ideas above - text=actor_info, - nest_indent=2, # since "complete" + text=f'{actor}', ) ) logmeth = log.info @@ -149,7 +143,7 @@ def _trio_main( + pformat.nest_from_op( input_op=')>', # like a "closed-to-play"-icon from super perspective - text=actor_info, + text=f'{actor}', nest_indent=1, ) ) @@ -167,7 +161,7 @@ def _trio_main( + pformat.nest_from_op( input_op='c)>', # closed due to cancel (see above) - text=actor_info, + text=f'{actor}', ) ) except BaseException as err: @@ -177,7 +171,7 @@ def _trio_main( + pformat.nest_from_op( input_op='x)>', # closed by error - text=actor_info, + text=f'{actor}', ) ) # NOTE since we raise a tb will already be shown on the From 7d320c4e1e1557fc1ffe5533393c3e1eff5f6eeb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jun 2025 15:39:09 -0400 Subject: [PATCH 10/25] Mk `Aid` hashable, use pretty-`.__repr__()` Hash on the `.uuid: str` and delegate verbatim to `msg.pretty_struct.Struct`'s equiv method. --- tractor/msg/types.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tractor/msg/types.py b/tractor/msg/types.py index f077f132..17d99449 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -177,6 +177,16 @@ class Aid( f'{self.name}@{self.pid!r}' ) + # mk hashable via `.uuid` + def __hash__(self) -> int: + return hash(self.uuid) + + def __eq__(self, other: Aid) -> bool: + return self.uuid == other.uuid + + # use pretty fmt since often repr-ed for console/log + __repr__ = pretty_struct.Struct.__repr__ + class SpawnSpec( pretty_struct.Struct, From 31544c862cb7ba67d460bef14e22c6c3d48322a2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jun 2025 15:47:42 -0400 Subject: [PATCH 11/25] More `.ipc.Channel`-repr related tweaks - only generate a repr in `.from_addr()` when log level is >= 'runtime'. |_ add a todo about supporting this optimization more generally on our adapter. - fix `Channel.pformat()` to show unknown peer field line fmt correctly. - add a `Channel.maddr: str` which just delegates directly to the `._transport` like other pass-thru property fields. --- tractor/ipc/_chan.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 478b4024..0f36b056 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -171,11 +171,23 @@ class Channel: ) assert transport.raddr == addr chan = Channel(transport=transport) - log.runtime( - f'Connected channel IPC transport\n' - f'[>\n' - f' |_{chan}\n' - ) + + # ?TODO, compact this into adapter level-methods? + # -[ ] would avoid extra repr-calcs if level not active? + # |_ how would the `calc_if_level` look though? func? + if log.at_least_level('runtime'): + from tractor.devx import ( + pformat as _pformat, + ) + chan_repr: str = _pformat.nest_from_op( + input_op='[>', + text=chan.pformat(), + nest_indent=1, + ) + log.runtime( + f'Connected channel IPC transport\n' + f'{chan_repr}' + ) return chan @cm @@ -218,7 +230,7 @@ class Channel: if privates else '' ) + ( # peer-actor (processs) section f' |_peer: {self.aid.reprol()!r}\n' - if self.aid else '' + if self.aid else ' |_peer: \n' ) + ( f' |_msgstream: {tpt_name}\n' f' maddr: {tpt.maddr!r}\n' @@ -259,6 +271,10 @@ class Channel: def raddr(self) -> Address|None: return self._transport.raddr if self._transport else None + @property + def maddr(self) -> str: + return self._transport.maddr if self._transport else '' + # TODO: something like, # `pdbp.hideframe_on(errors=[MsgTypeError])` # instead of the `try/except` hack we have rn.. From a23a98886c8ddfe9914b6294ac156930cb8cc587 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 3 Jul 2025 23:33:02 -0400 Subject: [PATCH 12/25] Even more `.ipc.*` repr refinements Mostly adjusting indentation, noise level, and clarity via `.pformat()` tweaks more general use of `.devx.pformat.nest_from_op()`. Specific impl deats, - use `pformat.ppfmt()/`nest_from_op()` more seriously throughout `._server`. - add a `._server.Endpoint.pformat()`. - add `._server.Server.len_peers()` and `.repr_state()`. - polish `Server.pformat()`. - drop some redundant `log.runtime()`s from `._serve_ipc_eps()` instead leaving-them-only/putting-them in the caller pub meth. - `._tcp.start_listener()` log the bound addr, not the input (which may be the 0-port. --- tractor/ipc/_chan.py | 4 +- tractor/ipc/_server.py | 265 ++++++++++++++++++++++++++++------------- tractor/ipc/_tcp.py | 12 +- 3 files changed, 191 insertions(+), 90 deletions(-) diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 0f36b056..64643d95 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -462,8 +462,8 @@ class Channel: await self.send(aid) peer_aid: Aid = await self.recv() log.runtime( - f'Received hanshake with peer ' - f'{peer_aid.reprol(sin_uuid=False)}\n' + f'Received hanshake with peer\n' + f'<= {peer_aid.reprol(sin_uuid=False)}\n' ) # NOTE, we always are referencing the remote peer! self.aid = peer_aid diff --git a/tractor/ipc/_server.py b/tractor/ipc/_server.py index a8732c10..e857db19 100644 --- a/tractor/ipc/_server.py +++ b/tractor/ipc/_server.py @@ -26,7 +26,7 @@ from contextlib import ( from functools import partial from itertools import chain import inspect -from pprint import pformat +import textwrap from types import ( ModuleType, ) @@ -43,7 +43,10 @@ from trio import ( SocketListener, ) -# from ..devx import debug +from ..devx.pformat import ( + ppfmt, + nest_from_op, +) from .._exceptions import ( TransportClosed, ) @@ -141,9 +144,8 @@ async def maybe_wait_on_canced_subs( ): log.cancel( - 'Waiting on cancel request to peer..\n' - f'c)=>\n' - f' |_{chan.aid}\n' + 'Waiting on cancel request to peer\n' + f'c)=> {chan.aid.reprol()}@[{chan.maddr}]\n' ) # XXX: this is a soft wait on the channel (and its @@ -179,7 +181,7 @@ async def maybe_wait_on_canced_subs( log.warning( 'Draining msg from disconnected peer\n' f'{chan_info}' - f'{pformat(msg)}\n' + f'{ppfmt(msg)}\n' ) # cid: str|None = msg.get('cid') cid: str|None = msg.cid @@ -248,7 +250,7 @@ async def maybe_wait_on_canced_subs( if children := local_nursery._children: # indent from above local-nurse repr report += ( - f' |_{pformat(children)}\n' + f' |_{ppfmt(children)}\n' ) log.warning(report) @@ -279,8 +281,9 @@ async def maybe_wait_on_canced_subs( log.runtime( f'Peer IPC broke but subproc is alive?\n\n' - f'<=x {chan.aid}@{chan.raddr}\n' - f' |_{proc}\n' + f'<=x {chan.aid.reprol()}@[{chan.maddr}]\n' + f'\n' + f'{proc}\n' ) return local_nursery @@ -324,9 +327,10 @@ async def handle_stream_from_peer( chan = Channel.from_stream(stream) con_status: str = ( - 'New inbound IPC connection <=\n' - f'|_{chan}\n' + f'New inbound IPC transport connection\n' + f'<=( {stream!r}\n' ) + con_status_steps: str = '' # initial handshake with peer phase try: @@ -372,7 +376,7 @@ async def handle_stream_from_peer( if _pre_chan := server._peers.get(uid): familiar: str = 'pre-existing-peer' uid_short: str = f'{uid[0]}[{uid[1][-6:]}]' - con_status += ( + con_status_steps += ( f' -> Handshake with {familiar} `{uid_short}` complete\n' ) @@ -397,7 +401,7 @@ async def handle_stream_from_peer( None, ) if event: - con_status += ( + con_status_steps += ( ' -> Waking subactor spawn waiters: ' f'{event.statistics().tasks_waiting}\n' f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n' @@ -408,7 +412,7 @@ async def handle_stream_from_peer( event.set() else: - con_status += ( + con_status_steps += ( f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n' ) # type: ignore @@ -422,8 +426,15 @@ async def handle_stream_from_peer( # TODO: can we just use list-ref directly? chans.append(chan) - con_status += ' -> Entering RPC msg loop..\n' - log.runtime(con_status) + con_status_steps += ' -> Entering RPC msg loop..\n' + log.runtime( + con_status + + + textwrap.indent( + con_status_steps, + prefix=' '*3, # align to first-ln + ) + ) # Begin channel management - respond to remote requests and # process received reponses. @@ -456,41 +467,67 @@ async def handle_stream_from_peer( disconnected=disconnected, ) - # ``Channel`` teardown and closure sequence + # `Channel` teardown and closure sequence # drop ref to channel so it can be gc-ed and disconnected - con_teardown_status: str = ( - f'IPC channel disconnected:\n' - f'<=x uid: {chan.aid}\n' - f' |_{pformat(chan)}\n\n' + # + # -[x]TODO mk this be like + # <=x Channel( + # |_field: blah + # )> + op_repr: str = '<=x ' + chan_repr: str = nest_from_op( + input_op=op_repr, + op_suffix='', + nest_prefix='', + text=chan.pformat(), + nest_indent=len(op_repr)-1, + rm_from_first_ln='<', ) + + con_teardown_status: str = ( + f'IPC channel disconnect\n' + f'\n' + f'{chan_repr}\n' + f'\n' + ) + chans.remove(chan) # TODO: do we need to be this pedantic? if not chans: con_teardown_status += ( - f'-> No more channels with {chan.aid}' + f'-> No more channels with {chan.aid.reprol()!r}\n' ) server._peers.pop(uid, None) - peers_str: str = '' - for uid, chans in server._peers.items(): - peers_str += ( - f'uid: {uid}\n' - ) - for i, chan in enumerate(chans): - peers_str += ( - f' |_[{i}] {pformat(chan)}\n' + if peers := list(server._peers.values()): + peer_cnt: int = len(peers) + if ( + (first := peers[0][0]) is not chan + and + not disconnected + and + peer_cnt > 1 + ): + con_teardown_status += ( + f'-> Remaining IPC {peer_cnt-1!r} peers:\n' ) - - con_teardown_status += ( - f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n' - ) + for chans in server._peers.values(): + first: Channel = chans[0] + if not ( + first is chan + and + disconnected + ): + con_teardown_status += ( + f' |_{first.aid.reprol()!r} -> {len(chans)!r} chans\n' + ) # No more channels to other actors (at all) registered # as connected. if not server._peers: con_teardown_status += ( - 'Signalling no more peer channel connections' + '-> Signalling no more peer connections!\n' ) server._no_more_peers.set() @@ -579,10 +616,10 @@ async def handle_stream_from_peer( class Endpoint(Struct): ''' - An instance of an IPC "bound" address where the lifetime of the - "ability to accept connections" (from clients) and then handle - those inbound sessions or sequences-of-packets is determined by - a (maybe pair of) nurser(y/ies). + An instance of an IPC "bound" address where the lifetime of an + "ability to accept connections" and handle the subsequent + sequence-of-packets (maybe oriented as sessions) is determined by + the underlying nursery scope(s). ''' addr: Address @@ -600,6 +637,24 @@ class Endpoint(Struct): MsgTransport, # handle to encoded-msg transport stream ] = {} + def pformat( + self, + indent: int = 0, + privates: bool = False, + ) -> str: + type_repr: str = type(self).__name__ + fmtstr: str = ( + # !TODO, always be ns aware! + # f'|_netns: {netns}\n' + f' |.addr: {self.addr!r}\n' + f' |_peers: {len(self.peer_tpts)}\n' + ) + return ( + f'<{type_repr}(\n' + f'{fmtstr}' + f')>' + ) + async def start_listener(self) -> SocketListener: tpt_mod: ModuleType = inspect.getmodule(self.addr) lstnr: SocketListener = await tpt_mod.start_listener( @@ -639,11 +694,13 @@ class Endpoint(Struct): class Server(Struct): _parent_tn: Nursery _stream_handler_tn: Nursery + # level-triggered sig for whether "no peers are currently # connected"; field is **always** set to an instance but # initialized with `.is_set() == True`. _no_more_peers: trio.Event + # active eps as allocated by `.listen_on()` _endpoints: list[Endpoint] = [] # connection tracking & mgmt @@ -651,12 +708,19 @@ class Server(Struct): str, # uaid list[Channel], # IPC conns from peer ] = defaultdict(list) + + # events-table with entries registered unset while the local + # actor is waiting on a new actor to inbound connect, often + # a parent waiting on its child just after spawn. _peer_connected: dict[ tuple[str, str], trio.Event, ] = {} # syncs for setup/teardown sequences + # - null when not yet booted, + # - unset when active, + # - set when fully shutdown with 0 eps active. _shutdown: trio.Event|None = None # TODO, maybe just make `._endpoints: list[Endpoint]` and @@ -664,7 +728,6 @@ class Server(Struct): # @property # def addrs2eps(self) -> dict[Address, Endpoint]: # ... - @property def proto_keys(self) -> list[str]: return [ @@ -690,7 +753,7 @@ class Server(Struct): # TODO: obvi a different server type when we eventually # support some others XD log.runtime( - f'Cancelling server(s) for\n' + f'Cancelling server(s) for tpt-protos\n' f'{self.proto_keys!r}\n' ) self._parent_tn.cancel_scope.cancel() @@ -717,6 +780,14 @@ class Server(Struct): f'protos: {tpt_protos!r}\n' ) + def len_peers( + self, + ) -> int: + return len([ + chan.connected() + for chan in chain(*self._peers.values()) + ]) + def has_peers( self, check_chans: bool = False, @@ -730,13 +801,11 @@ class Server(Struct): has_peers and check_chans + and + (peer_cnt := self.len_peers()) ): has_peers: bool = ( - any(chan.connected() - for chan in chain( - *self._peers.values() - ) - ) + peer_cnt > 0 and has_peers ) @@ -803,30 +872,66 @@ class Server(Struct): return ev.is_set() - def pformat(self) -> str: + @property + def repr_state(self) -> str: + ''' + A `str`-status describing the current state of this + IPC server in terms of the current operating "phase". + + ''' + status = 'server is active' + if self.has_peers(): + peer_cnt: int = self.len_peers() + status: str = ( + f'{peer_cnt!r} peer chans' + ) + else: + status: str = 'No peer chans' + + if self.is_shutdown(): + status: str = 'server-shutdown' + + return status + + def pformat( + self, + privates: bool = False, + ) -> str: eps: list[Endpoint] = self._endpoints - state_repr: str = ( - f'{len(eps)!r} IPC-endpoints active' - ) + # state_repr: str = ( + # f'{len(eps)!r} endpoints active' + # ) fmtstr = ( - f' |_state: {state_repr}\n' - f' no_more_peers: {self.has_peers()}\n' + f' |_state: {self.repr_state!r}\n' ) - if self._shutdown is not None: - shutdown_stats: EventStatistics = self._shutdown.statistics() + if privates: + fmtstr += f' no_more_peers: {self.has_peers()}\n' + + if self._shutdown is not None: + shutdown_stats: EventStatistics = self._shutdown.statistics() + fmtstr += ( + f' task_waiting_on_shutdown: {shutdown_stats}\n' + ) + + if eps := self._endpoints: + addrs: list[tuple] = [ + ep.addr for ep in eps + ] + repr_eps: str = ppfmt(addrs) + fmtstr += ( - f' task_waiting_on_shutdown: {shutdown_stats}\n' + f' |_endpoints: {repr_eps}\n' + # ^TODO? how to indent closing ']'.. ) - fmtstr += ( - # TODO, use the `ppfmt()` helper from `modden`! - f' |_endpoints: {pformat(self._endpoints)}\n' - f' |_peers: {len(self._peers)} connected\n' - ) + if peers := self._peers: + fmtstr += ( + f' |_peers: {len(peers)} connected\n' + ) return ( - f'\n' ) @@ -885,8 +990,8 @@ class Server(Struct): ) log.runtime( - f'Binding to endpoints for,\n' - f'{accept_addrs}\n' + f'Binding endpoints\n' + f'{ppfmt(accept_addrs)}\n' ) eps: list[Endpoint] = await self._parent_tn.start( partial( @@ -896,13 +1001,19 @@ class Server(Struct): listen_addrs=accept_addrs, ) ) + self._endpoints.extend(eps) + + serv_repr: str = nest_from_op( + input_op='(>', + text=self.pformat(), + nest_indent=1, + ) log.runtime( - f'Started IPC endpoints\n' - f'{eps}\n' + f'Started IPC server\n' + f'{serv_repr}' ) - self._endpoints.extend(eps) - # XXX, just a little bit of sanity + # XXX, a little sanity on new ep allocations group_tn: Nursery|None = None ep: Endpoint for ep in eps: @@ -956,9 +1067,13 @@ async def _serve_ipc_eps( stream_handler_tn=stream_handler_tn, ) try: + ep_sclang: str = nest_from_op( + input_op='>[', + text=f'{ep.pformat()}', + ) log.runtime( f'Starting new endpoint listener\n' - f'{ep}\n' + f'{ep_sclang}\n' ) listener: trio.abc.Listener = await ep.start_listener() assert listener is ep._listener @@ -996,17 +1111,6 @@ async def _serve_ipc_eps( handler_nursery=stream_handler_tn ) ) - # TODO, wow make this message better! XD - log.runtime( - 'Started server(s)\n' - + - '\n'.join([f'|_{addr}' for addr in listen_addrs]) - ) - - log.runtime( - f'Started IPC endpoints\n' - f'{eps}\n' - ) task_status.started( eps, ) @@ -1049,8 +1153,7 @@ async def open_ipc_server( try: yield ipc_server log.runtime( - f'Waiting on server to shutdown or be cancelled..\n' - f'{ipc_server}' + 'Server-tn running until terminated\n' ) # TODO? when if ever would we want/need this? # with trio.CancelScope(shield=True): diff --git a/tractor/ipc/_tcp.py b/tractor/ipc/_tcp.py index e945cdfb..a1f511d5 100644 --- a/tractor/ipc/_tcp.py +++ b/tractor/ipc/_tcp.py @@ -160,10 +160,9 @@ async def start_listener( Start a TCP socket listener on the given `TCPAddress`. ''' - log.info( - f'Attempting to bind TCP socket\n' - f'>[\n' - f'|_{addr}\n' + log.runtime( + f'Trying socket bind\n' + f'>[ {addr}\n' ) # ?TODO, maybe we should just change the lower-level call this is # using internall per-listener? @@ -178,11 +177,10 @@ async def start_listener( assert len(listeners) == 1 listener = listeners[0] host, port = listener.socket.getsockname()[:2] - + bound_addr: TCPAddress = type(addr).from_addr((host, port)) log.info( f'Listening on TCP socket\n' - f'[>\n' - f' |_{addr}\n' + f'[> {bound_addr}\n' ) return listener From 4bd8211abb862bf451f27452f3a87adb8a47ab51 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Jul 2025 15:01:29 -0400 Subject: [PATCH 13/25] Add #TODO for `._context` to use `.msg.Aid` --- tractor/_context.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/_context.py b/tractor/_context.py index 6d817d58..e33b9d6f 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -740,6 +740,8 @@ class Context: # cancelled, NOT their reported canceller. IOW in the # latter case we're cancelled by someone else getting # cancelled. + # + # !TODO, switching to `Actor.aid` here! if (canc := error.canceller) == self._actor.uid: whom: str = 'us' self._canceller = canc From 7be713ee1e88c5685781003637a02ef8834ad623 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Jul 2025 10:59:00 -0400 Subject: [PATCH 14/25] Use `nest_from_op()` in actor-nursery shutdown Including a new one-line `_shutdown_msg: str` which we mod-var-set for testing usage and some denoising at `.info()` level. Adjust `Actor()` instantiating input to the new `.registry_addrs` wrapped addrs property. --- tractor/_supervise.py | 44 +++++++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/tractor/_supervise.py b/tractor/_supervise.py index e1775292..9fdad8ce 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -21,7 +21,6 @@ from contextlib import asynccontextmanager as acm from functools import partial import inspect -from pprint import pformat from typing import ( TYPE_CHECKING, ) @@ -31,7 +30,10 @@ import warnings import trio -from .devx.debug import maybe_wait_for_debugger +from .devx import ( + debug, + pformat as _pformat, +) from ._addr import ( UnwrappedAddress, mk_uuid, @@ -199,7 +201,7 @@ class ActorNursery: loglevel=loglevel, # verbatim relay this actor's registrar addresses - registry_addrs=current_actor().reg_addrs, + registry_addrs=current_actor().registry_addrs, ) parent_addr: UnwrappedAddress = self._actor.accept_addr assert parent_addr @@ -453,7 +455,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # the "hard join phase". log.runtime( 'Waiting on subactors to complete:\n' - f'{pformat(an._children)}\n' + f'>}} {len(an._children)}\n' ) an._join_procs.set() @@ -467,7 +469,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # will make the pdb repl unusable. # Instead try to wait for pdb to be released before # tearing down. - await maybe_wait_for_debugger( + await debug.maybe_wait_for_debugger( child_in_debug=an._at_least_one_child_in_debug ) @@ -543,7 +545,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # XXX: yet another guard before allowing the cancel # sequence in case a (single) child is in debug. - await maybe_wait_for_debugger( + await debug.maybe_wait_for_debugger( child_in_debug=an._at_least_one_child_in_debug ) @@ -592,6 +594,11 @@ async def _open_and_supervise_one_cancels_all_nursery( # final exit +_shutdown_msg: str = ( + 'Actor-runtime-shutdown' +) + + @acm # @api_frame async def open_nursery( @@ -679,17 +686,26 @@ async def open_nursery( ): __tracebackhide__: bool = False - msg: str = ( - 'Actor-nursery exited\n' - f'|_{an}\n' + + op_nested_an_repr: str = _pformat.nest_from_op( + input_op=')>', + text=f'{an}', + # nest_prefix='|_', + nest_indent=1, # under > ) + an_msg: str = ( + f'Actor-nursery exited\n' + f'{op_nested_an_repr}\n' + ) + # keep noise low during std operation. + log.runtime(an_msg) if implicit_runtime: # shutdown runtime if it was started and report noisly # that we're did so. - msg += '=> Shutting down actor runtime <=\n' + msg: str = ( + '\n' + '\n' + f'{_shutdown_msg} )>\n' + ) log.info(msg) - - else: - # keep noise low during std operation. - log.runtime(msg) From 8880a80e3eb1ab5525480e21c844f942c0b0120d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Jul 2025 11:02:47 -0400 Subject: [PATCH 15/25] Use `nest_from_op()`/`pretty_struct` in `._rpc` Again for nicer console logging. Also fix a double `req_chan` arg bug when passed to `_invoke` in the `self.cancel()` rt-ep; don't update the `kwargs: dict` just merge in `req_chan` input at call time. --- tractor/_rpc.py | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 847a81d0..2bd4d6e3 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -672,7 +672,8 @@ async def _invoke( ctx._result = res log.runtime( f'Sending result msg and exiting {ctx.side!r}\n' - f'{return_msg}\n' + f'\n' + f'{pretty_struct.pformat(return_msg)}\n' ) await chan.send(return_msg) @@ -840,12 +841,12 @@ async def _invoke( else: descr_str += f'\n{merr!r}\n' else: - descr_str += f'\nand final result {ctx.outcome!r}\n' + descr_str += f'\nwith final result {ctx.outcome!r}\n' logmeth( - message - + - descr_str + f'{message}\n' + f'\n' + f'{descr_str}\n' ) @@ -1012,8 +1013,6 @@ async def process_messages( cid=cid, kwargs=kwargs, ): - kwargs |= {'req_chan': chan} - # XXX NOTE XXX don't start entire actor # runtime cancellation if this actor is # currently in debug mode! @@ -1032,14 +1031,14 @@ async def process_messages( cid, chan, actor.cancel, - kwargs, + kwargs | {'req_chan': chan}, is_rpc=False, return_msg_type=CancelAck, ) log.runtime( - 'Cancelling IPC transport msg-loop with peer:\n' - f'|_{chan}\n' + 'Cancelling RPC-msg-loop with peer\n' + f'->c}} {chan.aid.reprol()}@[{chan.maddr}]\n' ) loop_cs.cancel() break @@ -1235,9 +1234,21 @@ async def process_messages( # END-OF `async for`: # IPC disconnected via `trio.EndOfChannel`, likely # due to a (graceful) `Channel.aclose()`. + + chan_op_repr: str = '<=x] ' + chan_repr: str = _pformat.nest_from_op( + input_op=chan_op_repr, + op_suffix='', + nest_prefix='', + text=chan.pformat(), + nest_indent=len(chan_op_repr)-1, + rm_from_first_ln='<', + ) log.runtime( - f'channel for {chan.uid} disconnected, cancelling RPC tasks\n' - f'|_{chan}\n' + f'IPC channel disconnected\n' + f'{chan_repr}\n' + f'\n' + f'->c) cancelling RPC tasks.\n' ) await actor.cancel_rpc_tasks( req_aid=actor.aid, From 0ca3d506029076fd9b8e718f9b8599756db67dc6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Jul 2025 14:31:34 -0400 Subject: [PATCH 16/25] Use `._supervise._shutdown_msg` in tooling test --- tests/devx/test_tooling.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/devx/test_tooling.py b/tests/devx/test_tooling.py index c8cf4c8d..697b2bc1 100644 --- a/tests/devx/test_tooling.py +++ b/tests/devx/test_tooling.py @@ -121,9 +121,11 @@ def test_shield_pause( child.pid, signal.SIGINT, ) + from tractor._supervise import _shutdown_msg expect( child, - 'Shutting down actor runtime', + # 'Shutting down actor runtime', + _shutdown_msg, timeout=6, ) assert_before( From 5d87f63377d5af8a46928f9e5e779907c15b72e4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Jul 2025 15:46:53 -0400 Subject: [PATCH 17/25] Update buncha log msg fmting in `._portal` Namely to use `Channel.aid.reprol()` and converting to our newer style multi-line code style for str-reports. --- tractor/_portal.py | 75 ++++++++++++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index c741df7d..659ddf6d 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -115,6 +115,10 @@ class Portal: @property def chan(self) -> Channel: + ''' + Ref to this ctx's underlying `tractor.ipc.Channel`. + + ''' return self._chan @property @@ -174,10 +178,17 @@ class Portal: # not expecting a "main" result if self._expect_result_ctx is None: + peer_id: str = f'{self.channel.aid.reprol()!r}' log.warning( - f"Portal for {self.channel.aid} not expecting a final" - " result?\nresult() should only be called if subactor" - " was spawned with `ActorNursery.run_in_actor()`") + f'Portal to peer {peer_id} will not deliver a final result?\n' + f'\n' + f'Context.result() can only be called by the parent of ' + f'a sub-actor when it was spawned with ' + f'`ActorNursery.run_in_actor()`' + f'\n' + f'Further this `ActorNursery`-method-API will deprecated in the' + f'near fututre!\n' + ) return NoResult # expecting a "main" result @@ -210,6 +221,7 @@ class Portal: typname: str = type(self).__name__ log.warning( f'`{typname}.result()` is DEPRECATED!\n' + f'\n' f'Use `{typname}.wait_for_result()` instead!\n' ) return await self.wait_for_result( @@ -221,8 +233,10 @@ class Portal: # terminate all locally running async generator # IPC calls if self._streams: - log.cancel( - f"Cancelling all streams with {self.channel.aid}") + peer_id: str = f'{self.channel.aid.reprol()!r}' + report: str = ( + f'Cancelling all msg-streams with {peer_id}\n' + ) for stream in self._streams.copy(): try: await stream.aclose() @@ -231,10 +245,18 @@ class Portal: # (unless of course at some point down the road we # won't expect this to always be the case or need to # detect it for respawning purposes?) - log.debug(f"{stream} was already closed.") + report += ( + f'->) {stream!r} already closed\n' + ) + + log.cancel(report) async def aclose(self): - log.debug(f"Closing {self}") + log.debug( + f'Closing portal\n' + f'>}}\n' + f'|_{self}\n' + ) # TODO: once we move to implementing our own `ReceiveChannel` # (including remote task cancellation inside its `.aclose()`) # we'll need to .aclose all those channels here @@ -260,19 +282,18 @@ class Portal: __runtimeframe__: int = 1 # noqa chan: Channel = self.channel + peer_id: str = f'{self.channel.aid.reprol()!r}' if not chan.connected(): log.runtime( - 'This channel is already closed, skipping cancel request..' + 'Peer {peer_id} is already disconnected\n' + '-> skipping cancel request..\n' ) return False - reminfo: str = ( - f'c)=> {self.channel.aid}\n' - f' |_{chan}\n' - ) log.cancel( - f'Requesting actor-runtime cancel for peer\n\n' - f'{reminfo}' + f'Sending actor-runtime-cancel-req to peer\n' + f'\n' + f'c)=> {peer_id}\n' ) # XXX the one spot we set it? @@ -297,8 +318,9 @@ class Portal: # may timeout and we never get an ack (obvi racy) # but that doesn't mean it wasn't cancelled. log.debug( - 'May have failed to cancel peer?\n' - f'{reminfo}' + f'May have failed to cancel peer?\n' + f'\n' + f'c)=?> {peer_id}\n' ) # if we get here some weird cancellation case happened @@ -316,22 +338,22 @@ class Portal: TransportClosed, ) as tpt_err: - report: str = ( - f'IPC chan for actor already closed or broken?\n\n' - f'{self.channel.aid}\n' - f' |_{self.channel}\n' + ipc_borked_report: str = ( + f'IPC for actor already closed/broken?\n\n' + f'\n' + f'c)=x> {peer_id}\n' ) match tpt_err: case TransportClosed(): - log.debug(report) + log.debug(ipc_borked_report) case _: - report += ( + ipc_borked_report += ( f'\n' f'Unhandled low-level transport-closed/error during\n' f'Portal.cancel_actor()` request?\n' f'<{type(tpt_err).__name__}( {tpt_err} )>\n' ) - log.warning(report) + log.warning(ipc_borked_report) return False @@ -488,10 +510,13 @@ class Portal: with trio.CancelScope(shield=True): await ctx.cancel() - except trio.ClosedResourceError: + except trio.ClosedResourceError as cre: # if the far end terminates before we send a cancel the # underlying transport-channel may already be closed. - log.cancel(f'Context {ctx} was already closed?') + log.cancel( + f'Context.cancel() -> {cre!r}\n' + f'cid: {ctx.cid!r} already closed?\n' + ) # XXX: should this always be done? # await recv_chan.aclose() From d34fb54f7cd8e529dce3fabbc1782522b71ae685 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Jul 2025 19:11:01 -0400 Subject: [PATCH 18/25] Update buncha log msg fmting in `._spawn` Again using `Channel.aid.reprol()`, `.devx.pformat.nest_from_op()` and converting to multi-line code style an ' for str-report-contents. Tweak some imports to sub-mod level as well. --- tractor/_spawn.py | 68 ++++++++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 28 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index a3e3194e..1faadd73 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -34,9 +34,9 @@ from typing import ( import trio from trio import TaskStatus -from .devx.debug import ( - maybe_wait_for_debugger, - acquire_debug_lock, +from .devx import ( + debug, + pformat as _pformat ) from tractor._state import ( current_actor, @@ -51,14 +51,17 @@ from tractor._portal import Portal from tractor._runtime import Actor from tractor._entry import _mp_main from tractor._exceptions import ActorFailure -from tractor.msg.types import ( - Aid, - SpawnSpec, +from tractor.msg import ( + types as msgtypes, + pretty_struct, ) if TYPE_CHECKING: - from ipc import IPCServer + from ipc import ( + _server, + Channel, + ) from ._supervise import ActorNursery ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) @@ -328,20 +331,21 @@ async def soft_kill( see `.hard_kill()`). ''' - peer_aid: Aid = portal.channel.aid + chan: Channel = portal.channel + peer_aid: msgtypes.Aid = chan.aid try: log.cancel( f'Soft killing sub-actor via portal request\n' f'\n' - f'(c=> {peer_aid}\n' - f' |_{proc}\n' + f'c)=> {peer_aid.reprol()}@[{chan.maddr}]\n' + f' |_{proc}\n' ) # wait on sub-proc to signal termination await wait_func(proc) except trio.Cancelled: with trio.CancelScope(shield=True): - await maybe_wait_for_debugger( + await debug.maybe_wait_for_debugger( child_in_debug=_runtime_vars.get( '_debug_mode', False ), @@ -465,7 +469,7 @@ async def trio_proc( "--uid", # TODO, how to pass this over "wire" encodings like # cmdline args? - # -[ ] maybe we can add an `Aid.min_tuple()` ? + # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ? str(subactor.uid), # Address the child must connect to on startup "--parent_addr", @@ -483,13 +487,14 @@ async def trio_proc( cancelled_during_spawn: bool = False proc: trio.Process|None = None - ipc_server: IPCServer = actor_nursery._actor.ipc_server + ipc_server: _server.Server = actor_nursery._actor.ipc_server try: try: proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs) log.runtime( - 'Started new child\n' - f'|_{proc}\n' + f'Started new child subproc\n' + f'(>\n' + f' |_{proc}\n' ) # wait for actor to spawn and connect back to us @@ -507,10 +512,10 @@ async def trio_proc( with trio.CancelScope(shield=True): # don't clobber an ongoing pdb if is_root_process(): - await maybe_wait_for_debugger() + await debug.maybe_wait_for_debugger() elif proc is not None: - async with acquire_debug_lock(subactor.uid): + async with debug.acquire_debug_lock(subactor.uid): # soft wait on the proc to terminate with trio.move_on_after(0.5): await proc.wait() @@ -528,14 +533,19 @@ async def trio_proc( # send a "spawning specification" which configures the # initial runtime state of the child. - sspec = SpawnSpec( + sspec = msgtypes.SpawnSpec( _parent_main_data=subactor._parent_main_data, enable_modules=subactor.enable_modules, reg_addrs=subactor.reg_addrs, bind_addrs=bind_addrs, _runtime_vars=_runtime_vars, ) - log.runtime(f'Sending spawn spec: {str(sspec)}') + log.runtime( + f'Sending spawn spec to child\n' + f'{{}}=> {chan.aid.reprol()!r}\n' + f'\n' + f'{pretty_struct.pformat(sspec)}\n' + ) await chan.send(sspec) # track subactor in current nursery @@ -563,7 +573,7 @@ async def trio_proc( # condition. await soft_kill( proc, - trio.Process.wait, + trio.Process.wait, # XXX, uses `pidfd_open()` below. portal ) @@ -571,8 +581,7 @@ async def trio_proc( # tandem if not done already log.cancel( 'Cancelling portal result reaper task\n' - f'>c)\n' - f' |_{subactor.uid}\n' + f'c)> {subactor.aid.reprol()!r}\n' ) nursery.cancel_scope.cancel() @@ -581,21 +590,24 @@ async def trio_proc( # allowed! Do this **after** cancellation/teardown to avoid # killing the process too early. if proc: + reap_repr: str = _pformat.nest_from_op( + input_op='>x)', + text=subactor.pformat(), + ) log.cancel( f'Hard reap sequence starting for subactor\n' - f'>x)\n' - f' |_{subactor}@{subactor.uid}\n' + f'{reap_repr}' ) with trio.CancelScope(shield=True): # don't clobber an ongoing pdb if cancelled_during_spawn: # Try again to avoid TTY clobbering. - async with acquire_debug_lock(subactor.uid): + async with debug.acquire_debug_lock(subactor.uid): with trio.move_on_after(0.5): await proc.wait() - await maybe_wait_for_debugger( + await debug.maybe_wait_for_debugger( child_in_debug=_runtime_vars.get( '_debug_mode', False ), @@ -624,7 +636,7 @@ async def trio_proc( # acquire the lock and get notified of who has it, # check that uid against our known children? # this_uid: tuple[str, str] = current_actor().uid - # await acquire_debug_lock(this_uid) + # await debug.acquire_debug_lock(this_uid) if proc.poll() is None: log.cancel(f"Attempting to hard kill {proc}") @@ -727,7 +739,7 @@ async def mp_proc( log.runtime(f"Started {proc}") - ipc_server: IPCServer = actor_nursery._actor.ipc_server + ipc_server: _server.Server = actor_nursery._actor.ipc_server try: # wait for actor to spawn and connect back to us # channel should have handshake completed by the From 414b0e2baeeb797a3bee2e5434540063204e5342 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jul 2025 11:01:44 -0400 Subject: [PATCH 19/25] Update buncha log msg fmting in `.msg._ops` Mostly just multi-line code styling again: always putting standalone `'f\n'` on separate LOC so it reads like it renders to console. Oh and and a level drop to `.runtime()` for rx-msg reports. --- tractor/msg/_ops.py | 41 +++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 9a9c9914..1dad63c8 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -210,12 +210,14 @@ class PldRx(Struct): match msg: case Return()|Error(): log.runtime( - f'Rxed final outcome msg\n' + f'Rxed final-outcome msg\n' + f'\n' f'{msg}\n' ) case Stop(): log.runtime( f'Rxed stream stopped msg\n' + f'\n' f'{msg}\n' ) if passthrough_non_pld_msgs: @@ -261,8 +263,9 @@ class PldRx(Struct): if ( type(msg) is Return ): - log.info( + log.runtime( f'Rxed final result msg\n' + f'\n' f'{msg}\n' ) return self.decode_pld( @@ -304,10 +307,13 @@ class PldRx(Struct): try: pld: PayloadT = self._pld_dec.decode(pld) log.runtime( - 'Decoded msg payload\n\n' + f'Decoded payload for\n' + # f'\n' f'{msg}\n' - f'where payload decoded as\n' - f'|_pld={pld!r}\n' + # ^TODO?, ideally just render with `, + # pld={decode}` in the `msg.pformat()`?? + f'where, ' + f'{type(msg).__name__}.pld={pld!r}\n' ) return pld except TypeError as typerr: @@ -494,7 +500,8 @@ def limit_plds( finally: log.runtime( - 'Reverted to previous payload-decoder\n\n' + f'Reverted to previous payload-decoder\n' + f'\n' f'{orig_pldec}\n' ) # sanity on orig settings @@ -629,7 +636,8 @@ async def drain_to_final_msg( (local_cs := rent_n.cancel_scope).cancel_called ): log.cancel( - 'RPC-ctx cancelled by local-parent scope during drain!\n\n' + f'RPC-ctx cancelled by local-parent scope during drain!\n' + f'\n' f'c}}>\n' f' |_{rent_n}\n' f' |_.cancel_scope = {local_cs}\n' @@ -663,7 +671,8 @@ async def drain_to_final_msg( # final result arrived! case Return(): log.runtime( - 'Context delivered final draining msg:\n' + f'Context delivered final draining msg\n' + f'\n' f'{pretty_struct.pformat(msg)}' ) ctx._result: Any = pld @@ -697,12 +706,14 @@ async def drain_to_final_msg( ): log.cancel( 'Cancelling `MsgStream` drain since ' - f'{reason}\n\n' + f'{reason}\n' + f'\n' f'<= {ctx.chan.uid}\n' - f' |_{ctx._nsf}()\n\n' + f' |_{ctx._nsf}()\n' + f'\n' f'=> {ctx._task}\n' - f' |_{ctx._stream}\n\n' - + f' |_{ctx._stream}\n' + f'\n' f'{pretty_struct.pformat(msg)}\n' ) break @@ -739,7 +750,8 @@ async def drain_to_final_msg( case Stop(): pre_result_drained.append(msg) log.runtime( # normal/expected shutdown transaction - 'Remote stream terminated due to "stop" msg:\n\n' + f'Remote stream terminated due to "stop" msg\n' + f'\n' f'{pretty_struct.pformat(msg)}\n' ) continue @@ -814,7 +826,8 @@ async def drain_to_final_msg( else: log.cancel( - 'Skipping `MsgStream` drain since final outcome is set\n\n' + f'Skipping `MsgStream` drain since final outcome is set\n' + f'\n' f'{ctx.outcome}\n' ) From defd6e28d28dbce032cdbb925f114004aadeda40 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Jul 2025 19:03:21 -0400 Subject: [PATCH 20/25] Facepalm, actually use `.log.cancel()`-level to report parent-side taskc.. --- tractor/_context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_context.py b/tractor/_context.py index e33b9d6f..61994f98 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -2259,7 +2259,7 @@ async def open_context_from_portal( # await debug.pause() # log.cancel( match scope_err: - case trio.Cancelled: + case trio.Cancelled(): logmeth = log.cancel # XXX explicitly report on any non-graceful-taskc cases From 48fbf38c1d30ce9a91de69e134af5a6f69df5ad1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Jul 2025 15:05:38 -0400 Subject: [PATCH 21/25] Drop duplicated (masked) debugging-`terminate_after`, prolly a rebase slip.. --- tractor/_spawn.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 1faadd73..408e793c 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -236,10 +236,6 @@ async def hard_kill( # whilst also hacking on it XD # terminate_after: int = 99999, - # NOTE: for mucking with `.pause()`-ing inside the runtime - # whilst also hacking on it XD - # terminate_after: int = 99999, - ) -> None: ''' Un-gracefully terminate an OS level `trio.Process` after timeout. From e275c49b237e3baea333a5d79a8422cbcfe0c075 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Jul 2025 15:18:13 -0400 Subject: [PATCH 22/25] Stackscope import fail msg dun need braces.. --- tractor/devx/_stackscope.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tractor/devx/_stackscope.py b/tractor/devx/_stackscope.py index 84d8a67f..11d2a1ef 100644 --- a/tractor/devx/_stackscope.py +++ b/tractor/devx/_stackscope.py @@ -237,9 +237,9 @@ def enable_stack_on_sig( try: import stackscope except ImportError: - log.error( - '`stackscope` not installed for use in debug mode!\n' - '`Ignoring {enable_stack_on_sig!r} call!\n' + log.warning( + 'The `stackscope` lib is not installed!\n' + '`Ignoring enable_stack_on_sig() call!\n' ) return None From 69e0afccf0bc650af42f48f787adc63a5c97f1ee Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Jul 2025 10:37:02 -0400 Subject: [PATCH 23/25] Use `Address` where possible in (root) actor boot Namely inside various bootup-sequences in `._root` and `._runtime` particularly in the root actor to support both better tpt-address denoting in our logging and as part of clarifying logic around setting the root's registry addresses which is soon to be much better factored out of the core and into an explicit subsystem + API. Some `_root.open_root_actor()` deats, - set `registry_addrs` to a new `uw_reg_addrs` (uw: unwrapped) to be more explicit about wrapped addr types thoughout. - instead ensure `registry_addrs` are the wrapped types and pass down into the root `Actor` singleton-instance. - factor the root-actor check + rt-vars update (updating the `'_root_addrs'`) out of `._runtime.async_main()` into this fn. - as previous, set `trans_bind_addrs = uw_reg_addrs` in unwrapped form since it will be passed down both through rt-vars as `'_root_addrs'` and to `._runtim.async_main()` as `accept_addrs` (which is then passed to the IPC server). - adjust/simplify much logging. - shield the `await actor.cancel(None) # self cancel` to avoid any finally-footguns. - as mentioned convert the For `_runtime.async_main()` tweaks, - expect `registry_addrs: list[Address]|None = None` with appropriate unwrapping prior to setting both `.reg_addrs` and the equiv rt-var. - add a new `.registry_addrs` prop for the wrapped form. - convert a final loose-eg for the `service_nursery` to use `collapse_eg()`. - simplify teardown report logging. --- tractor/_root.py | 99 ++++++++++++++++++++++++++++++++------------- tractor/_runtime.py | 78 ++++++++++++++++++----------------- 2 files changed, 112 insertions(+), 65 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index 88347132..16d70b98 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -203,7 +203,9 @@ async def open_root_actor( ''' # XXX NEVER allow nested actor-trees! - if already_actor := _state.current_actor(err_on_no_runtime=False): + if already_actor := _state.current_actor( + err_on_no_runtime=False, + ): rtvs: dict[str, Any] = _state._runtime_vars root_mailbox: list[str, int] = rtvs['_root_mailbox'] registry_addrs: list[list[str, int]] = rtvs['_registry_addrs'] @@ -273,14 +275,20 @@ async def open_root_actor( DeprecationWarning, stacklevel=2, ) - registry_addrs = [arbiter_addr] + uw_reg_addrs = [arbiter_addr] - if not registry_addrs: - registry_addrs: list[UnwrappedAddress] = default_lo_addrs( + uw_reg_addrs = registry_addrs + if not uw_reg_addrs: + uw_reg_addrs: list[UnwrappedAddress] = default_lo_addrs( enable_transports ) - assert registry_addrs + # must exist by now since all below code is dependent + assert uw_reg_addrs + registry_addrs: list[Address] = [ + wrap_address(uw_addr) + for uw_addr in uw_reg_addrs + ] loglevel = ( loglevel @@ -329,10 +337,10 @@ async def open_root_actor( enable_stack_on_sig() # closed into below ping task-func - ponged_addrs: list[UnwrappedAddress] = [] + ponged_addrs: list[Address] = [] async def ping_tpt_socket( - addr: UnwrappedAddress, + addr: Address, timeout: float = 1, ) -> None: ''' @@ -352,17 +360,22 @@ async def open_root_actor( # be better to eventually have a "discovery" protocol # with basic handshake instead? with trio.move_on_after(timeout): - async with _connect_chan(addr): + async with _connect_chan(addr.unwrap()): ponged_addrs.append(addr) except OSError: - # TODO: make this a "discovery" log level? + # ?TODO, make this a "discovery" log level? logger.info( - f'No actor registry found @ {addr}\n' + f'No root-actor registry found @ {addr!r}\n' ) + # !TODO, this is basically just another (abstract) + # happy-eyeballs, so we should try for formalize it somewhere + # in a `.[_]discovery` ya? + # async with trio.open_nursery() as tn: - for addr in registry_addrs: + for uw_addr in uw_reg_addrs: + addr: Address = wrap_address(uw_addr) tn.start_soon( ping_tpt_socket, addr, @@ -391,24 +404,28 @@ async def open_root_actor( loglevel=loglevel, enable_modules=enable_modules, ) - # DO NOT use the registry_addrs as the transport server - # addrs for this new non-registar, root-actor. + # **DO NOT** use the registry_addrs as the + # ipc-transport-server's bind-addrs as this is + # a new NON-registrar, ROOT-actor. + # + # XXX INSTEAD, bind random addrs using the same tpt + # proto. for addr in ponged_addrs: - waddr: Address = wrap_address(addr) trans_bind_addrs.append( - waddr.get_random(bindspace=waddr.bindspace) + addr.get_random( + bindspace=addr.bindspace, + ) ) # Start this local actor as the "registrar", aka a regular # actor who manages the local registry of "mailboxes" of # other process-tree-local sub-actors. else: - # NOTE that if the current actor IS THE REGISTAR, the # following init steps are taken: # - the tranport layer server is bound to each addr # pair defined in provided registry_addrs, or the default. - trans_bind_addrs = registry_addrs + trans_bind_addrs = uw_reg_addrs # - it is normally desirable for any registrar to stay up # indefinitely until either all registered (child/sub) @@ -431,6 +448,16 @@ async def open_root_actor( # `.trio.run()`. actor._infected_aio = _state._runtime_vars['_is_infected_aio'] + # NOTE, only set the loopback addr for the + # process-tree-global "root" mailbox since all sub-actors + # should be able to speak to their root actor over that + # channel. + raddrs: list[Address] = _state._runtime_vars['_root_addrs'] + raddrs.extend(trans_bind_addrs) + # TODO, remove once we have also removed all usage; + # eventually all (root-)registry apis should expect > 1 addr. + _state._runtime_vars['_root_mailbox'] = raddrs[0] + # Start up main task set via core actor-runtime nurseries. try: # assign process-local actor @@ -438,13 +465,16 @@ async def open_root_actor( # start local channel-server and fake the portal API # NOTE: this won't block since we provide the nursery - ml_addrs_str: str = '\n'.join( - f'@{addr}' for addr in trans_bind_addrs - ) - logger.info( - f'Starting local {actor.uid} on the following transport addrs:\n' - f'{ml_addrs_str}' - ) + report: str = f'Starting actor-runtime for {actor.aid.reprol()!r}\n' + if reg_addrs := actor.registry_addrs: + report += ( + '-> Opening new registry @ ' + + + '\n'.join( + f'@{addr}' for addr in reg_addrs + ) + ) + logger.info(f'{report}\n') # start the actor runtime in a new task async with trio.open_nursery( @@ -526,9 +556,14 @@ async def open_root_actor( ) logger.info( f'Closing down root actor\n' - f'{op_nested_actor_repr}\n' + f'{op_nested_actor_repr}' ) - await actor.cancel(None) # self cancel + # XXX, THIS IS A *finally-footgun*! + # -> though already shields iternally it can + # taskc here and mask underlying errors raised in + # the try-block above? + with trio.CancelScope(shield=True): + await actor.cancel(None) # self cancel finally: # revert all process-global runtime state if ( @@ -541,10 +576,16 @@ async def open_root_actor( _state._current_actor = None _state._last_actor_terminated = actor - logger.runtime( + sclang_repr: str = _pformat.nest_from_op( + input_op=')>', + text=actor.pformat(), + nest_prefix='|_', + nest_indent=1, + ) + + logger.info( f'Root actor terminated\n' - f')>\n' - f' |_{actor}\n' + f'{sclang_repr}' ) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 95348e86..f3f0714c 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -210,7 +210,7 @@ class Actor: *, enable_modules: list[str] = [], loglevel: str|None = None, - registry_addrs: list[UnwrappedAddress]|None = None, + registry_addrs: list[Address]|None = None, spawn_method: str|None = None, # TODO: remove! @@ -253,11 +253,12 @@ class Actor: if arbiter_addr is not None: warnings.warn( '`Actor(arbiter_addr=)` is now deprecated.\n' - 'Use `registry_addrs: list[tuple]` instead.', + 'Use `registry_addrs: list[Address]` instead.', DeprecationWarning, stacklevel=2, ) - registry_addrs: list[UnwrappedAddress] = [arbiter_addr] + + registry_addrs: list[Address] = [wrap_address(arbiter_addr)] # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -296,8 +297,10 @@ class Actor: # input via the validator. self._reg_addrs: list[UnwrappedAddress] = [] if registry_addrs: - self.reg_addrs: list[UnwrappedAddress] = registry_addrs - _state._runtime_vars['_registry_addrs'] = registry_addrs + _state._runtime_vars['_registry_addrs'] = self.reg_addrs = [ + addr.unwrap() + for addr in registry_addrs + ] @property def aid(self) -> msgtypes.Aid: @@ -469,7 +472,11 @@ class Actor: def reg_addrs(self) -> list[UnwrappedAddress]: ''' List of (socket) addresses for all known (and contactable) - registry actors. + registry-service actors in "unwrapped" (i.e. IPC interchange + wire-compat) form. + + If you are looking for the "wrapped" address form, use + `.registry_addrs` instead. ''' return self._reg_addrs @@ -488,8 +495,14 @@ class Actor: self._reg_addrs = addrs + @property + def registry_addrs(self) -> list[Address]: + return [wrap_address(uw_addr) + for uw_addr in self.reg_addrs] + def load_modules( self, + ) -> None: ''' Load explicitly enabled python modules from local fs after @@ -1362,7 +1375,7 @@ class Actor: Return all IPC channels to the actor with provided `uid`. ''' - return self._peers[uid] + return self._ipc_server._peers[uid] def is_infected_aio(self) -> bool: ''' @@ -1417,6 +1430,8 @@ async def async_main( # establish primary connection with immediate parent actor._parent_chan: Channel|None = None + # is this a sub-actor? + # get runtime info from parent. if parent_addr is not None: ( actor._parent_chan, @@ -1462,7 +1477,6 @@ async def async_main( trio.open_nursery( strict_exception_groups=False, ) as service_nursery, - _server.open_ipc_server( parent_tn=service_nursery, stream_handler_tn=service_nursery, @@ -1513,9 +1527,6 @@ async def async_main( # TODO: why is this not with the root nursery? try: - log.runtime( - 'Booting IPC server' - ) eps: list = await ipc_server.listen_on( accept_addrs=accept_addrs, stream_handler_nursery=service_nursery, @@ -1547,18 +1558,6 @@ async def async_main( # TODO, just read direct from ipc_server? accept_addrs: list[UnwrappedAddress] = actor.accept_addrs - # NOTE: only set the loopback addr for the - # process-tree-global "root" mailbox since - # all sub-actors should be able to speak to - # their root actor over that channel. - if _state._runtime_vars['_is_root']: - raddrs: list[Address] = _state._runtime_vars['_root_addrs'] - for addr in accept_addrs: - waddr: Address = wrap_address(addr) - raddrs.append(addr) - else: - _state._runtime_vars['_root_mailbox'] = raddrs[0] - # Register with the arbiter if we're told its addr log.runtime( f'Registering `{actor.name}` => {pformat(accept_addrs)}\n' @@ -1576,6 +1575,7 @@ async def async_main( except AssertionError: await debug.pause() + # !TODO, get rid of the local-portal crap XD async with get_registry(addr) as reg_portal: for accept_addr in accept_addrs: accept_addr = wrap_address(accept_addr) @@ -1614,7 +1614,7 @@ async def async_main( log.runtime( 'Service nursery complete\n' '\n' - '-> Waiting on root nursery to complete' + '->} waiting on root nursery to complete..\n' ) # Blocks here as expected until the root nursery is @@ -1669,6 +1669,7 @@ async def async_main( finally: teardown_report: str = ( 'Main actor-runtime task completed\n' + '\n' ) # ?TODO? should this be in `._entry`/`._root` mods instead? @@ -1710,7 +1711,8 @@ async def async_main( # Unregister actor from the registry-sys / registrar. if ( is_registered - and not actor.is_registrar + and + not actor.is_registrar ): failed: bool = False for addr in actor.reg_addrs: @@ -1745,7 +1747,8 @@ async def async_main( ipc_server.has_peers(check_chans=True) ): teardown_report += ( - f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n' + f'-> Waiting for remaining peers to clear..\n' + f' {pformat(ipc_server._peers)}' ) log.runtime(teardown_report) await ipc_server.wait_for_no_more_peers( @@ -1753,20 +1756,23 @@ async def async_main( ) teardown_report += ( - '-> All peer channels are complete\n' + '-]> all peer channels are complete.\n' ) - op_nested_actor_repr: str = _pformat.nest_from_op( - input_op=')> ', - text=actor.pformat(), - nest_prefix='|_', - nest_indent=2, - ) + # op_nested_actor_repr: str = _pformat.nest_from_op( + # input_op=')>', + # text=actor.pformat(), + # nest_prefix='|_', + # nest_indent=1, # under > + # ) teardown_report += ( - 'Actor runtime exited\n' - f'{op_nested_actor_repr}' + '-)> actor runtime main task exit.\n' + # f'{op_nested_actor_repr}' ) - log.info(teardown_report) + # if _state._runtime_vars['_is_root']: + # log.info(teardown_report) + # else: + log.runtime(teardown_report) # TODO: rename to `Registry` and move to `.discovery._registry`! From f53aa992affc0d81705b81d882248e19f37729a4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jun 2025 15:33:31 -0400 Subject: [PATCH 24/25] .log: expose `at_least_level()` as `StackLevelAdapter` meth --- tractor/log.py | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/tractor/log.py b/tractor/log.py index 393c9571..329562b1 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -81,10 +81,35 @@ BOLD_PALETTE = { } +def at_least_level( + log: Logger|LoggerAdapter, + level: int|str, +) -> bool: + ''' + Predicate to test if a given level is active. + + ''' + if isinstance(level, str): + level: int = CUSTOM_LEVELS[level.upper()] + + if log.getEffectiveLevel() <= level: + return True + return False + + # TODO: this isn't showing the correct '{filename}' # as it did before.. class StackLevelAdapter(LoggerAdapter): + def at_least_level( + self, + level: str, + ) -> bool: + return at_least_level( + log=self, + level=level, + ) + def transport( self, msg: str, @@ -401,19 +426,3 @@ def get_loglevel() -> str: # global module logger for tractor itself log: StackLevelAdapter = get_logger('tractor') - - -def at_least_level( - log: Logger|LoggerAdapter, - level: int|str, -) -> bool: - ''' - Predicate to test if a given level is active. - - ''' - if isinstance(level, str): - level: int = CUSTOM_LEVELS[level.upper()] - - if log.getEffectiveLevel() <= level: - return True - return False From ca427aec7e058970008036d59eb2e7296d0b8c93 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jul 2025 12:44:46 -0400 Subject: [PATCH 25/25] More prep-to-reduce the `Actor` method-iface - drop the (never/un)used `.get_chans()`. - add #TODO for factoring many methods into a new `.rpc`-subsys/pkg primitive, like an `RPCMngr/Server` type eventually. - add todo to maybe mv `.get_parent()` elsewhere? - move masked `._hard_mofo_kill()` to bottom. --- tractor/_runtime.py | 59 ++++++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index f3f0714c..bc915b85 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -549,6 +549,14 @@ class Actor: ) raise + # ?TODO, factor this meth-iface into a new `.rpc` subsys primitive? + # - _get_rpc_func(), + # - _deliver_ctx_payload(), + # - get_context(), + # - start_remote_task(), + # - cancel_rpc_tasks(), + # - _cancel_task(), + # def _get_rpc_func(self, ns, funcname): ''' Try to lookup and return a target RPC func from the @@ -1116,14 +1124,6 @@ class Actor: self._cancel_complete.set() return True - # XXX: hard kill logic if needed? - # def _hard_mofo_kill(self): - # # If we're the root actor or zombied kill everything - # if self._parent_chan is None: # TODO: more robust check - # root = trio.lowlevel.current_root_task() - # for n in root.child_nurseries: - # n.cancel_scope.cancel() - async def _cancel_task( self, cid: str, @@ -1358,25 +1358,13 @@ class Actor: ''' return self.accept_addrs[0] - def get_parent(self) -> Portal: - ''' - Return a `Portal` to our parent. - - ''' - assert self._parent_chan, "No parent channel for this actor?" - return Portal(self._parent_chan) - - def get_chans( - self, - uid: tuple[str, str], - - ) -> list[Channel]: - ''' - Return all IPC channels to the actor with provided `uid`. - - ''' - return self._ipc_server._peers[uid] - + # TODO, this should delegate ONLY to the + # `._spawn_spec._runtime_vars: dict` / `._state` APIs? + # + # XXX, AH RIGHT that's why.. + # it's bc we pass this as a CLI flag to the child.py precisely + # bc we need the bootstrapping pre `async_main()`.. but maybe + # keep this as an impl deat and not part of the pub iface impl? def is_infected_aio(self) -> bool: ''' If `True`, this actor is running `trio` in guest mode on @@ -1387,6 +1375,23 @@ class Actor: ''' return self._infected_aio + # ?TODO, is this the right type for this method? + def get_parent(self) -> Portal: + ''' + Return a `Portal` to our parent. + + ''' + assert self._parent_chan, "No parent channel for this actor?" + return Portal(self._parent_chan) + + # XXX: hard kill logic if needed? + # def _hard_mofo_kill(self): + # # If we're the root actor or zombied kill everything + # if self._parent_chan is None: # TODO: more robust check + # root = trio.lowlevel.current_root_task() + # for n in root.child_nurseries: + # n.cancel_scope.cancel() + async def async_main( actor: Actor,