diff --git a/tests/devx/test_tooling.py b/tests/devx/test_tooling.py index c8cf4c8d..697b2bc1 100644 --- a/tests/devx/test_tooling.py +++ b/tests/devx/test_tooling.py @@ -121,9 +121,11 @@ def test_shield_pause( child.pid, signal.SIGINT, ) + from tractor._supervise import _shutdown_msg expect( child, - 'Shutting down actor runtime', + # 'Shutting down actor runtime', + _shutdown_msg, timeout=6, ) assert_before( diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 25935df2..b6d469d9 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -410,7 +410,6 @@ def test_peer_canceller( ''' async def main(): async with tractor.open_nursery( - # NOTE: to halt the peer tasks on ctxc, uncomment this. debug_mode=debug_mode, ) as an: canceller: Portal = await an.start_actor( diff --git a/tractor/_context.py b/tractor/_context.py index 6d817d58..61994f98 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -740,6 +740,8 @@ class Context: # cancelled, NOT their reported canceller. IOW in the # latter case we're cancelled by someone else getting # cancelled. + # + # !TODO, switching to `Actor.aid` here! if (canc := error.canceller) == self._actor.uid: whom: str = 'us' self._canceller = canc @@ -2257,7 +2259,7 @@ async def open_context_from_portal( # await debug.pause() # log.cancel( match scope_err: - case trio.Cancelled: + case trio.Cancelled(): logmeth = log.cancel # XXX explicitly report on any non-graceful-taskc cases diff --git a/tractor/_entry.py b/tractor/_entry.py index 8fffed5f..68e72501 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -21,7 +21,7 @@ Sub-process entry points. from __future__ import annotations from functools import partial import multiprocessing as mp -import os +# import os from typing import ( Any, TYPE_CHECKING, @@ -38,6 +38,7 @@ from .devx import ( _frame_stack, pformat, ) +# from .msg import pretty_struct from .to_asyncio import run_as_asyncio_guest from ._addr import UnwrappedAddress from ._runtime import ( @@ -127,20 +128,13 @@ def _trio_main( if actor.loglevel is not None: get_console_log(actor.loglevel) - actor_info: str = ( - f'|_{actor}\n' - f' uid: {actor.uid}\n' - f' pid: {os.getpid()}\n' - f' parent_addr: {parent_addr}\n' - f' loglevel: {actor.loglevel}\n' - ) log.info( - 'Starting new `trio` subactor\n' + f'Starting `trio` subactor from parent @ ' + f'{parent_addr}\n' + pformat.nest_from_op( input_op='>(', # see syntax ideas above - text=actor_info, - nest_indent=2, # since "complete" + text=f'{actor}', ) ) logmeth = log.info @@ -149,7 +143,7 @@ def _trio_main( + pformat.nest_from_op( input_op=')>', # like a "closed-to-play"-icon from super perspective - text=actor_info, + text=f'{actor}', nest_indent=1, ) ) @@ -167,7 +161,7 @@ def _trio_main( + pformat.nest_from_op( input_op='c)>', # closed due to cancel (see above) - text=actor_info, + text=f'{actor}', ) ) except BaseException as err: @@ -177,7 +171,7 @@ def _trio_main( + pformat.nest_from_op( input_op='x)>', # closed by error - text=actor_info, + text=f'{actor}', ) ) # NOTE since we raise a tb will already be shown on the diff --git a/tractor/_portal.py b/tractor/_portal.py index c741df7d..659ddf6d 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -115,6 +115,10 @@ class Portal: @property def chan(self) -> Channel: + ''' + Ref to this ctx's underlying `tractor.ipc.Channel`. + + ''' return self._chan @property @@ -174,10 +178,17 @@ class Portal: # not expecting a "main" result if self._expect_result_ctx is None: + peer_id: str = f'{self.channel.aid.reprol()!r}' log.warning( - f"Portal for {self.channel.aid} not expecting a final" - " result?\nresult() should only be called if subactor" - " was spawned with `ActorNursery.run_in_actor()`") + f'Portal to peer {peer_id} will not deliver a final result?\n' + f'\n' + f'Context.result() can only be called by the parent of ' + f'a sub-actor when it was spawned with ' + f'`ActorNursery.run_in_actor()`' + f'\n' + f'Further this `ActorNursery`-method-API will deprecated in the' + f'near fututre!\n' + ) return NoResult # expecting a "main" result @@ -210,6 +221,7 @@ class Portal: typname: str = type(self).__name__ log.warning( f'`{typname}.result()` is DEPRECATED!\n' + f'\n' f'Use `{typname}.wait_for_result()` instead!\n' ) return await self.wait_for_result( @@ -221,8 +233,10 @@ class Portal: # terminate all locally running async generator # IPC calls if self._streams: - log.cancel( - f"Cancelling all streams with {self.channel.aid}") + peer_id: str = f'{self.channel.aid.reprol()!r}' + report: str = ( + f'Cancelling all msg-streams with {peer_id}\n' + ) for stream in self._streams.copy(): try: await stream.aclose() @@ -231,10 +245,18 @@ class Portal: # (unless of course at some point down the road we # won't expect this to always be the case or need to # detect it for respawning purposes?) - log.debug(f"{stream} was already closed.") + report += ( + f'->) {stream!r} already closed\n' + ) + + log.cancel(report) async def aclose(self): - log.debug(f"Closing {self}") + log.debug( + f'Closing portal\n' + f'>}}\n' + f'|_{self}\n' + ) # TODO: once we move to implementing our own `ReceiveChannel` # (including remote task cancellation inside its `.aclose()`) # we'll need to .aclose all those channels here @@ -260,19 +282,18 @@ class Portal: __runtimeframe__: int = 1 # noqa chan: Channel = self.channel + peer_id: str = f'{self.channel.aid.reprol()!r}' if not chan.connected(): log.runtime( - 'This channel is already closed, skipping cancel request..' + 'Peer {peer_id} is already disconnected\n' + '-> skipping cancel request..\n' ) return False - reminfo: str = ( - f'c)=> {self.channel.aid}\n' - f' |_{chan}\n' - ) log.cancel( - f'Requesting actor-runtime cancel for peer\n\n' - f'{reminfo}' + f'Sending actor-runtime-cancel-req to peer\n' + f'\n' + f'c)=> {peer_id}\n' ) # XXX the one spot we set it? @@ -297,8 +318,9 @@ class Portal: # may timeout and we never get an ack (obvi racy) # but that doesn't mean it wasn't cancelled. log.debug( - 'May have failed to cancel peer?\n' - f'{reminfo}' + f'May have failed to cancel peer?\n' + f'\n' + f'c)=?> {peer_id}\n' ) # if we get here some weird cancellation case happened @@ -316,22 +338,22 @@ class Portal: TransportClosed, ) as tpt_err: - report: str = ( - f'IPC chan for actor already closed or broken?\n\n' - f'{self.channel.aid}\n' - f' |_{self.channel}\n' + ipc_borked_report: str = ( + f'IPC for actor already closed/broken?\n\n' + f'\n' + f'c)=x> {peer_id}\n' ) match tpt_err: case TransportClosed(): - log.debug(report) + log.debug(ipc_borked_report) case _: - report += ( + ipc_borked_report += ( f'\n' f'Unhandled low-level transport-closed/error during\n' f'Portal.cancel_actor()` request?\n' f'<{type(tpt_err).__name__}( {tpt_err} )>\n' ) - log.warning(report) + log.warning(ipc_borked_report) return False @@ -488,10 +510,13 @@ class Portal: with trio.CancelScope(shield=True): await ctx.cancel() - except trio.ClosedResourceError: + except trio.ClosedResourceError as cre: # if the far end terminates before we send a cancel the # underlying transport-channel may already be closed. - log.cancel(f'Context {ctx} was already closed?') + log.cancel( + f'Context.cancel() -> {cre!r}\n' + f'cid: {ctx.cid!r} already closed?\n' + ) # XXX: should this always be done? # await recv_chan.aclose() diff --git a/tractor/_root.py b/tractor/_root.py index 82bec667..16d70b98 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -47,6 +47,7 @@ from ._runtime import ( from .devx import ( debug, _frame_stack, + pformat as _pformat, ) from . import _spawn from . import _state @@ -202,7 +203,9 @@ async def open_root_actor( ''' # XXX NEVER allow nested actor-trees! - if already_actor := _state.current_actor(err_on_no_runtime=False): + if already_actor := _state.current_actor( + err_on_no_runtime=False, + ): rtvs: dict[str, Any] = _state._runtime_vars root_mailbox: list[str, int] = rtvs['_root_mailbox'] registry_addrs: list[list[str, int]] = rtvs['_registry_addrs'] @@ -272,14 +275,20 @@ async def open_root_actor( DeprecationWarning, stacklevel=2, ) - registry_addrs = [arbiter_addr] + uw_reg_addrs = [arbiter_addr] - if not registry_addrs: - registry_addrs: list[UnwrappedAddress] = default_lo_addrs( + uw_reg_addrs = registry_addrs + if not uw_reg_addrs: + uw_reg_addrs: list[UnwrappedAddress] = default_lo_addrs( enable_transports ) - assert registry_addrs + # must exist by now since all below code is dependent + assert uw_reg_addrs + registry_addrs: list[Address] = [ + wrap_address(uw_addr) + for uw_addr in uw_reg_addrs + ] loglevel = ( loglevel @@ -328,10 +337,10 @@ async def open_root_actor( enable_stack_on_sig() # closed into below ping task-func - ponged_addrs: list[UnwrappedAddress] = [] + ponged_addrs: list[Address] = [] async def ping_tpt_socket( - addr: UnwrappedAddress, + addr: Address, timeout: float = 1, ) -> None: ''' @@ -351,17 +360,22 @@ async def open_root_actor( # be better to eventually have a "discovery" protocol # with basic handshake instead? with trio.move_on_after(timeout): - async with _connect_chan(addr): + async with _connect_chan(addr.unwrap()): ponged_addrs.append(addr) except OSError: - # TODO: make this a "discovery" log level? + # ?TODO, make this a "discovery" log level? logger.info( - f'No actor registry found @ {addr}\n' + f'No root-actor registry found @ {addr!r}\n' ) + # !TODO, this is basically just another (abstract) + # happy-eyeballs, so we should try for formalize it somewhere + # in a `.[_]discovery` ya? + # async with trio.open_nursery() as tn: - for addr in registry_addrs: + for uw_addr in uw_reg_addrs: + addr: Address = wrap_address(uw_addr) tn.start_soon( ping_tpt_socket, addr, @@ -390,24 +404,28 @@ async def open_root_actor( loglevel=loglevel, enable_modules=enable_modules, ) - # DO NOT use the registry_addrs as the transport server - # addrs for this new non-registar, root-actor. + # **DO NOT** use the registry_addrs as the + # ipc-transport-server's bind-addrs as this is + # a new NON-registrar, ROOT-actor. + # + # XXX INSTEAD, bind random addrs using the same tpt + # proto. for addr in ponged_addrs: - waddr: Address = wrap_address(addr) trans_bind_addrs.append( - waddr.get_random(bindspace=waddr.bindspace) + addr.get_random( + bindspace=addr.bindspace, + ) ) # Start this local actor as the "registrar", aka a regular # actor who manages the local registry of "mailboxes" of # other process-tree-local sub-actors. else: - # NOTE that if the current actor IS THE REGISTAR, the # following init steps are taken: # - the tranport layer server is bound to each addr # pair defined in provided registry_addrs, or the default. - trans_bind_addrs = registry_addrs + trans_bind_addrs = uw_reg_addrs # - it is normally desirable for any registrar to stay up # indefinitely until either all registered (child/sub) @@ -430,6 +448,16 @@ async def open_root_actor( # `.trio.run()`. actor._infected_aio = _state._runtime_vars['_is_infected_aio'] + # NOTE, only set the loopback addr for the + # process-tree-global "root" mailbox since all sub-actors + # should be able to speak to their root actor over that + # channel. + raddrs: list[Address] = _state._runtime_vars['_root_addrs'] + raddrs.extend(trans_bind_addrs) + # TODO, remove once we have also removed all usage; + # eventually all (root-)registry apis should expect > 1 addr. + _state._runtime_vars['_root_mailbox'] = raddrs[0] + # Start up main task set via core actor-runtime nurseries. try: # assign process-local actor @@ -437,13 +465,16 @@ async def open_root_actor( # start local channel-server and fake the portal API # NOTE: this won't block since we provide the nursery - ml_addrs_str: str = '\n'.join( - f'@{addr}' for addr in trans_bind_addrs - ) - logger.info( - f'Starting local {actor.uid} on the following transport addrs:\n' - f'{ml_addrs_str}' - ) + report: str = f'Starting actor-runtime for {actor.aid.reprol()!r}\n' + if reg_addrs := actor.registry_addrs: + report += ( + '-> Opening new registry @ ' + + + '\n'.join( + f'@{addr}' for addr in reg_addrs + ) + ) + logger.info(f'{report}\n') # start the actor runtime in a new task async with trio.open_nursery( @@ -518,12 +549,21 @@ async def open_root_actor( # for an in nurseries: # tempn.start_soon(an.exited.wait) + op_nested_actor_repr: str = _pformat.nest_from_op( + input_op='>) ', + text=actor.pformat(), + nest_prefix='|_', + ) logger.info( f'Closing down root actor\n' - f'>)\n' - f'|_{actor}\n' + f'{op_nested_actor_repr}' ) - await actor.cancel(None) # self cancel + # XXX, THIS IS A *finally-footgun*! + # -> though already shields iternally it can + # taskc here and mask underlying errors raised in + # the try-block above? + with trio.CancelScope(shield=True): + await actor.cancel(None) # self cancel finally: # revert all process-global runtime state if ( @@ -536,10 +576,16 @@ async def open_root_actor( _state._current_actor = None _state._last_actor_terminated = actor - logger.runtime( + sclang_repr: str = _pformat.nest_from_op( + input_op=')>', + text=actor.pformat(), + nest_prefix='|_', + nest_indent=1, + ) + + logger.info( f'Root actor terminated\n' - f')>\n' - f' |_{actor}\n' + f'{sclang_repr}' ) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 2535dcf0..2bd4d6e3 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -64,6 +64,7 @@ from .trionics import ( from .devx import ( debug, add_div, + pformat as _pformat, ) from . import _state from .log import get_logger @@ -72,7 +73,7 @@ from .msg import ( MsgCodec, PayloadT, NamespacePath, - # pretty_struct, + pretty_struct, _ops as msgops, ) from tractor.msg.types import ( @@ -220,11 +221,18 @@ async def _invoke_non_context( task_status.started(ctx) result = await coro fname: str = func.__name__ + + op_nested_task: str = _pformat.nest_from_op( + input_op=f')> cid: {ctx.cid!r}', + text=f'{ctx._task}', + nest_indent=1, # under > + ) log.runtime( - 'RPC complete:\n' - f'task: {ctx._task}\n' - f'|_cid={ctx.cid}\n' - f'|_{fname}() -> {pformat(result)}\n' + f'RPC task complete\n' + f'\n' + f'{op_nested_task}\n' + f'\n' + f')> {fname}() -> {pformat(result)}\n' ) # NOTE: only send result if we know IPC isn't down @@ -664,7 +672,8 @@ async def _invoke( ctx._result = res log.runtime( f'Sending result msg and exiting {ctx.side!r}\n' - f'{return_msg}\n' + f'\n' + f'{pretty_struct.pformat(return_msg)}\n' ) await chan.send(return_msg) @@ -832,12 +841,12 @@ async def _invoke( else: descr_str += f'\n{merr!r}\n' else: - descr_str += f'\nand final result {ctx.outcome!r}\n' + descr_str += f'\nwith final result {ctx.outcome!r}\n' logmeth( - message - + - descr_str + f'{message}\n' + f'\n' + f'{descr_str}\n' ) @@ -1004,8 +1013,6 @@ async def process_messages( cid=cid, kwargs=kwargs, ): - kwargs |= {'req_chan': chan} - # XXX NOTE XXX don't start entire actor # runtime cancellation if this actor is # currently in debug mode! @@ -1024,14 +1031,14 @@ async def process_messages( cid, chan, actor.cancel, - kwargs, + kwargs | {'req_chan': chan}, is_rpc=False, return_msg_type=CancelAck, ) log.runtime( - 'Cancelling IPC transport msg-loop with peer:\n' - f'|_{chan}\n' + 'Cancelling RPC-msg-loop with peer\n' + f'->c}} {chan.aid.reprol()}@[{chan.maddr}]\n' ) loop_cs.cancel() break @@ -1044,7 +1051,7 @@ async def process_messages( ): target_cid: str = kwargs['cid'] kwargs |= { - 'requesting_uid': chan.uid, + 'requesting_aid': chan.aid, 'ipc_msg': msg, # XXX NOTE! ONLY the rpc-task-owning @@ -1080,21 +1087,34 @@ async def process_messages( ns=ns, func=funcname, kwargs=kwargs, # type-spec this? see `msg.types` - uid=actorid, + uid=actor_uuid, ): + if actor_uuid != chan.aid.uid: + raise RuntimeError( + f'IPC msg <-> chan.aid mismatch!?\n' + f'Channel.aid = {chan.aid!r}\n' + f'Start.uid = {actor_uuid!r}\n' + ) + # await debug.pause() + op_repr: str = 'Start <=) ' + req_repr: str = _pformat.nest_from_op( + input_op=op_repr, + op_suffix='', + nest_prefix='', + text=f'{chan}', + + nest_indent=len(op_repr)-1, + rm_from_first_ln='<', + # ^XXX, subtract -1 to account for + # > {actor.uid}\n' - f' |_{actor}\n' - f' -> nsp: `{ns}.{funcname}({kwargs})`\n' - - # f' |_{ns}.{funcname}({kwargs})\n\n' - - # f'{pretty_struct.pformat(msg)}\n' + 'Handling RPC request\n' + f'{req_repr}\n' + f'\n' + f'->{{ ipc-context-id: {cid!r}\n' + f'->{{ nsp for fn: `{ns}.{funcname}({kwargs})`\n' ) # runtime-internal endpoint: `Actor.` @@ -1123,10 +1143,6 @@ async def process_messages( await chan.send(err_msg) continue - start_status += ( - f' -> func: {func}\n' - ) - # schedule a task for the requested RPC function # in the actor's main "service nursery". # @@ -1134,7 +1150,7 @@ async def process_messages( # supervision isolation? would avoid having to # manage RPC tasks individually in `._rpc_tasks` # table? - start_status += ' -> scheduling new task..\n' + start_status += '->( scheduling new task..\n' log.runtime(start_status) try: ctx: Context = await actor._service_n.start( @@ -1218,12 +1234,24 @@ async def process_messages( # END-OF `async for`: # IPC disconnected via `trio.EndOfChannel`, likely # due to a (graceful) `Channel.aclose()`. + + chan_op_repr: str = '<=x] ' + chan_repr: str = _pformat.nest_from_op( + input_op=chan_op_repr, + op_suffix='', + nest_prefix='', + text=chan.pformat(), + nest_indent=len(chan_op_repr)-1, + rm_from_first_ln='<', + ) log.runtime( - f'channel for {chan.uid} disconnected, cancelling RPC tasks\n' - f'|_{chan}\n' + f'IPC channel disconnected\n' + f'{chan_repr}\n' + f'\n' + f'->c) cancelling RPC tasks.\n' ) await actor.cancel_rpc_tasks( - req_uid=actor.uid, + req_aid=actor.aid, # a "self cancel" in terms of the lifetime of the # IPC connection which is presumed to be the # source of any requests for spawned tasks. @@ -1295,13 +1323,37 @@ async def process_messages( finally: # msg debugging for when he machinery is brokey if msg is None: - message: str = 'Exiting IPC msg loop without receiving a msg?' + message: str = 'Exiting RPC-loop without receiving a msg?' else: + task_op_repr: str = ')>' + task: trio.Task = trio.lowlevel.current_task() + + # maybe add cancelled opt prefix + if task._cancel_status.effectively_cancelled: + task_op_repr = 'c' + task_op_repr + + task_repr: str = _pformat.nest_from_op( + input_op=task_op_repr, + text=f'{task!r}', + nest_indent=1, + ) + # chan_op_repr: str = '<=} ' + # chan_repr: str = _pformat.nest_from_op( + # input_op=chan_op_repr, + # op_suffix='', + # nest_prefix='', + # text=chan.pformat(), + # nest_indent=len(chan_op_repr)-1, + # rm_from_first_ln='<', + # ) message: str = ( - 'Exiting IPC msg loop with final msg\n\n' - f'<= peer: {chan.uid}\n' - f' |_{chan}\n\n' - # f'{pretty_struct.pformat(msg)}' + f'Exiting RPC-loop with final msg\n' + f'\n' + # f'{chan_repr}\n' + f'{task_repr}\n' + f'\n' + f'{pretty_struct.pformat(msg)}' + f'\n' ) log.runtime(message) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 758e5685..bc915b85 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -55,6 +55,7 @@ from typing import ( TYPE_CHECKING, ) import uuid +import textwrap from types import ModuleType import warnings @@ -97,7 +98,10 @@ from ._exceptions import ( MsgTypeError, unpack_error, ) -from .devx import debug +from .devx import ( + debug, + pformat as _pformat +) from ._discovery import get_registry from ._portal import Portal from . import _state @@ -206,7 +210,7 @@ class Actor: *, enable_modules: list[str] = [], loglevel: str|None = None, - registry_addrs: list[UnwrappedAddress]|None = None, + registry_addrs: list[Address]|None = None, spawn_method: str|None = None, # TODO: remove! @@ -227,7 +231,7 @@ class Actor: # state self._cancel_complete = trio.Event() - self._cancel_called_by_remote: tuple[str, tuple]|None = None + self._cancel_called_by: tuple[str, tuple]|None = None self._cancel_called: bool = False # retreive and store parent `__main__` data which @@ -249,11 +253,12 @@ class Actor: if arbiter_addr is not None: warnings.warn( '`Actor(arbiter_addr=)` is now deprecated.\n' - 'Use `registry_addrs: list[tuple]` instead.', + 'Use `registry_addrs: list[Address]` instead.', DeprecationWarning, stacklevel=2, ) - registry_addrs: list[UnwrappedAddress] = [arbiter_addr] + + registry_addrs: list[Address] = [wrap_address(arbiter_addr)] # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -292,8 +297,10 @@ class Actor: # input via the validator. self._reg_addrs: list[UnwrappedAddress] = [] if registry_addrs: - self.reg_addrs: list[UnwrappedAddress] = registry_addrs - _state._runtime_vars['_registry_addrs'] = registry_addrs + _state._runtime_vars['_registry_addrs'] = self.reg_addrs = [ + addr.unwrap() + for addr in registry_addrs + ] @property def aid(self) -> msgtypes.Aid: @@ -339,46 +346,125 @@ class Actor: def pid(self) -> int: return self._aid.pid - def pformat(self) -> str: - ds: str = '=' - parent_uid: tuple|None = None - if rent_chan := self._parent_chan: - parent_uid = rent_chan.uid + @property + def repr_state(self) -> str: + if self.cancel_complete: + return 'cancelled' + + elif canceller := self.cancel_caller: + return f' and cancel-called by {canceller}' + + else: + return 'running' + + def pformat( + self, + ds: str = ':', + indent: int = 0, + privates: bool = False, + ) -> str: + + fmtstr: str = f'|_id: {self.aid.reprol()!r}\n' + if privates: + aid_nest_prefix: str = '|_aid=' + aid_field_repr: str = _pformat.nest_from_op( + input_op='', + text=pretty_struct.pformat( + struct=self.aid, + field_indent=2, + ), + op_suffix='', + nest_prefix=aid_nest_prefix, + nest_indent=0, + ) + fmtstr: str = f'{aid_field_repr}' + + if rent_chan := self._parent_chan: + fmtstr += ( + f"|_parent{ds}{rent_chan.aid.reprol()}\n" + ) - peers: list = [] server: _server.IPCServer = self.ipc_server if server: - peers: list[tuple] = list(server._peer_connected) + if privates: + server_repr: str = self._ipc_server.pformat( + privates=privates, + ) + # create field ln as a key-header indented under + # and up to the section's key prefix. + # ^XXX if we were to indent `repr(Server)` to + # ': ' + # _here_^ + server_repr: str = _pformat.nest_from_op( + input_op='', # nest as sub-obj + op_suffix='', + text=server_repr, + ) + fmtstr += ( + f"{server_repr}" + ) + else: + fmtstr += ( + f'|_ipc: {server.repr_state!r}\n' + ) - fmtstr: str = ( - f' |_id: {self.aid!r}\n' - # f" aid{ds}{self.aid!r}\n" - f" parent{ds}{parent_uid}\n" - f'\n' - f' |_ipc: {len(peers)!r} connected peers\n' - f" peers{ds}{peers!r}\n" - f" ipc_server{ds}{self._ipc_server}\n" - f'\n' - f' |_rpc: {len(self._rpc_tasks)} tasks\n' - f" ctxs{ds}{len(self._contexts)}\n" - f'\n' - f' |_runtime: ._task{ds}{self._task!r}\n' - f' _spawn_method{ds}{self._spawn_method}\n' - f' _actoruid2nursery{ds}{self._actoruid2nursery}\n' - f' _forkserver_info{ds}{self._forkserver_info}\n' - f'\n' - f' |_state: "TODO: .repr_state()"\n' - f' _cancel_complete{ds}{self._cancel_complete}\n' - f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n' - f' _cancel_called{ds}{self._cancel_called}\n' + fmtstr += ( + f'|_rpc: {len(self._rpc_tasks)} active tasks\n' ) - return ( - '\n' + + # TODO, actually fix the .repr_state impl/output? + # append ipc-ctx state summary + # ctxs: dict = self._contexts + # if ctxs: + # ctx_states: dict[str, int] = {} + # for ctx in self._contexts.values(): + # ctx_state: str = ctx.repr_state + # cnt = ctx_states.setdefault(ctx_state, 0) + # ctx_states[ctx_state] = cnt + 1 + + # fmtstr += ( + # f" ctxs{ds}{ctx_states}\n" + # ) + + # runtime-state + task_name: str = '' + if task := self._task: + task_name: str = task.name + fmtstr += ( + # TODO, this just like ctx? + f'|_state: {self.repr_state!r}\n' + f' task: {task_name}\n' + f' loglevel: {self.loglevel!r}\n' + f' subactors_spawned: {len(self._actoruid2nursery)}\n' ) + if not _state.is_root_process(): + fmtstr += f' spawn_method: {self._spawn_method!r}\n' + + if privates: + fmtstr += ( + # f' actoruid2nursery{ds}{self._actoruid2nursery}\n' + f' cancel_complete{ds}{self._cancel_complete}\n' + f' cancel_called_by_remote{ds}{self._cancel_called_by}\n' + f' cancel_called{ds}{self._cancel_called}\n' + ) + + if fmtstr: + fmtstr: str = textwrap.indent( + text=fmtstr, + prefix=' '*(1 + indent), + ) + + _repr: str = ( + f'<{type(self).__name__}(\n' + f'{fmtstr}' + f')>\n' + ) + if indent: + _repr: str = textwrap.indent( + text=_repr, + prefix=' '*indent, + ) + return _repr __repr__ = pformat @@ -386,7 +472,11 @@ class Actor: def reg_addrs(self) -> list[UnwrappedAddress]: ''' List of (socket) addresses for all known (and contactable) - registry actors. + registry-service actors in "unwrapped" (i.e. IPC interchange + wire-compat) form. + + If you are looking for the "wrapped" address form, use + `.registry_addrs` instead. ''' return self._reg_addrs @@ -405,8 +495,14 @@ class Actor: self._reg_addrs = addrs + @property + def registry_addrs(self) -> list[Address]: + return [wrap_address(uw_addr) + for uw_addr in self.reg_addrs] + def load_modules( self, + ) -> None: ''' Load explicitly enabled python modules from local fs after @@ -453,6 +549,14 @@ class Actor: ) raise + # ?TODO, factor this meth-iface into a new `.rpc` subsys primitive? + # - _get_rpc_func(), + # - _deliver_ctx_payload(), + # - get_context(), + # - start_remote_task(), + # - cancel_rpc_tasks(), + # - _cancel_task(), + # def _get_rpc_func(self, ns, funcname): ''' Try to lookup and return a target RPC func from the @@ -496,11 +600,11 @@ class Actor: queue. ''' - uid: tuple[str, str] = chan.uid - assert uid, f"`chan.uid` can't be {uid}" + aid: msgtypes.Aid = chan.aid + assert aid, f"`chan.aid` can't be {aid}" try: ctx: Context = self._contexts[( - uid, + aid.uid, cid, # TODO: how to determine this tho? @@ -511,7 +615,7 @@ class Actor: 'Ignoring invalid IPC msg!?\n' f'Ctx seems to not/no-longer exist??\n' f'\n' - f'<=? {uid}\n' + f'<=? {aid.reprol()!r}\n' f' |_{pretty_struct.pformat(msg)}\n' ) match msg: @@ -560,6 +664,7 @@ class Actor: msging session's lifetime. ''' + # ?TODO, use Aid here as well? actor_uid = chan.uid assert actor_uid try: @@ -908,6 +1013,22 @@ class Actor: None, # self cancel all rpc tasks ) + @property + def cancel_complete(self) -> bool: + return self._cancel_complete.is_set() + + @property + def cancel_called(self) -> bool: + ''' + Was this actor requested to cancel by a remote peer actor. + + ''' + return self._cancel_called_by is not None + + @property + def cancel_caller(self) -> msgtypes.Aid|None: + return self._cancel_called_by + async def cancel( self, @@ -932,20 +1053,18 @@ class Actor: ''' ( - requesting_uid, - requester_type, + requesting_aid, # Aid + requester_type, # str req_chan, log_meth, ) = ( - req_chan.uid, + req_chan.aid, 'peer', req_chan, log.cancel, - ) if req_chan else ( - # a self cancel of ALL rpc tasks - self.uid, + self.aid, 'self', self, log.runtime, @@ -953,14 +1072,14 @@ class Actor: # TODO: just use the new `Context.repr_rpc: str` (and # other) repr fields instead of doing this all manual.. msg: str = ( - f'Actor-runtime cancel request from {requester_type}\n\n' - f'<=c) {requesting_uid}\n' - f' |_{self}\n' + f'Actor-runtime cancel request from {requester_type!r}\n' f'\n' + f'<=c)\n' + f'{self}' ) # TODO: what happens here when we self-cancel tho? - self._cancel_called_by_remote: tuple = requesting_uid + self._cancel_called_by: tuple = requesting_aid self._cancel_called = True # cancel all ongoing rpc tasks @@ -988,7 +1107,7 @@ class Actor: # self-cancel **all** ongoing RPC tasks await self.cancel_rpc_tasks( - req_uid=requesting_uid, + req_aid=requesting_aid, parent_chan=None, ) @@ -1005,19 +1124,11 @@ class Actor: self._cancel_complete.set() return True - # XXX: hard kill logic if needed? - # def _hard_mofo_kill(self): - # # If we're the root actor or zombied kill everything - # if self._parent_chan is None: # TODO: more robust check - # root = trio.lowlevel.current_root_task() - # for n in root.child_nurseries: - # n.cancel_scope.cancel() - async def _cancel_task( self, cid: str, parent_chan: Channel, - requesting_uid: tuple[str, str]|None, + requesting_aid: msgtypes.Aid|None, ipc_msg: dict|None|bool = False, @@ -1055,7 +1166,7 @@ class Actor: log.runtime( 'Cancel request for invalid RPC task.\n' 'The task likely already completed or was never started!\n\n' - f'<= canceller: {requesting_uid}\n' + f'<= canceller: {requesting_aid}\n' f'=> {cid}@{parent_chan.uid}\n' f' |_{parent_chan}\n' ) @@ -1063,9 +1174,12 @@ class Actor: log.cancel( 'Rxed cancel request for RPC task\n' - f'<=c) {requesting_uid}\n' - f' |_{ctx._task}\n' - f' >> {ctx.repr_rpc}\n' + f'{ctx._task!r} <=c) {requesting_aid}\n' + f'|_>> {ctx.repr_rpc}\n' + + # f'|_{ctx._task}\n' + # f' >> {ctx.repr_rpc}\n' + # f'=> {ctx._task}\n' # f' >> Actor._cancel_task() => {ctx._task}\n' # f' |_ {ctx._task}\n\n' @@ -1086,9 +1200,9 @@ class Actor: ) if ( ctx._canceller is None - and requesting_uid + and requesting_aid ): - ctx._canceller: tuple = requesting_uid + ctx._canceller: tuple = requesting_aid.uid # TODO: pack the RPC `{'cmd': }` msg into a ctxc and # then raise and pack it here? @@ -1114,7 +1228,7 @@ class Actor: # wait for _invoke to mark the task complete flow_info: str = ( - f'<= canceller: {requesting_uid}\n' + f'<= canceller: {requesting_aid}\n' f'=> ipc-parent: {parent_chan}\n' f'|_{ctx}\n' ) @@ -1131,7 +1245,7 @@ class Actor: async def cancel_rpc_tasks( self, - req_uid: tuple[str, str], + req_aid: msgtypes.Aid, # NOTE: when None is passed we cancel **all** rpc # tasks running in this actor! @@ -1148,7 +1262,7 @@ class Actor: if not tasks: log.runtime( 'Actor has no cancellable RPC tasks?\n' - f'<= canceller: {req_uid}\n' + f'<= canceller: {req_aid.reprol()}\n' ) return @@ -1188,7 +1302,7 @@ class Actor: ) log.cancel( f'Cancelling {descr} RPC tasks\n\n' - f'<=c) {req_uid} [canceller]\n' + f'<=c) {req_aid} [canceller]\n' f'{rent_chan_repr}' f'c)=> {self.uid} [cancellee]\n' f' |_{self} [with {len(tasks)} tasks]\n' @@ -1216,7 +1330,7 @@ class Actor: await self._cancel_task( cid, task_caller_chan, - requesting_uid=req_uid, + requesting_aid=req_aid, ) if tasks: @@ -1244,25 +1358,13 @@ class Actor: ''' return self.accept_addrs[0] - def get_parent(self) -> Portal: - ''' - Return a `Portal` to our parent. - - ''' - assert self._parent_chan, "No parent channel for this actor?" - return Portal(self._parent_chan) - - def get_chans( - self, - uid: tuple[str, str], - - ) -> list[Channel]: - ''' - Return all IPC channels to the actor with provided `uid`. - - ''' - return self._peers[uid] - + # TODO, this should delegate ONLY to the + # `._spawn_spec._runtime_vars: dict` / `._state` APIs? + # + # XXX, AH RIGHT that's why.. + # it's bc we pass this as a CLI flag to the child.py precisely + # bc we need the bootstrapping pre `async_main()`.. but maybe + # keep this as an impl deat and not part of the pub iface impl? def is_infected_aio(self) -> bool: ''' If `True`, this actor is running `trio` in guest mode on @@ -1273,6 +1375,23 @@ class Actor: ''' return self._infected_aio + # ?TODO, is this the right type for this method? + def get_parent(self) -> Portal: + ''' + Return a `Portal` to our parent. + + ''' + assert self._parent_chan, "No parent channel for this actor?" + return Portal(self._parent_chan) + + # XXX: hard kill logic if needed? + # def _hard_mofo_kill(self): + # # If we're the root actor or zombied kill everything + # if self._parent_chan is None: # TODO: more robust check + # root = trio.lowlevel.current_root_task() + # for n in root.child_nurseries: + # n.cancel_scope.cancel() + async def async_main( actor: Actor, @@ -1316,6 +1435,8 @@ async def async_main( # establish primary connection with immediate parent actor._parent_chan: Channel|None = None + # is this a sub-actor? + # get runtime info from parent. if parent_addr is not None: ( actor._parent_chan, @@ -1361,7 +1482,6 @@ async def async_main( trio.open_nursery( strict_exception_groups=False, ) as service_nursery, - _server.open_ipc_server( parent_tn=service_nursery, stream_handler_tn=service_nursery, @@ -1412,9 +1532,6 @@ async def async_main( # TODO: why is this not with the root nursery? try: - log.runtime( - 'Booting IPC server' - ) eps: list = await ipc_server.listen_on( accept_addrs=accept_addrs, stream_handler_nursery=service_nursery, @@ -1446,18 +1563,6 @@ async def async_main( # TODO, just read direct from ipc_server? accept_addrs: list[UnwrappedAddress] = actor.accept_addrs - # NOTE: only set the loopback addr for the - # process-tree-global "root" mailbox since - # all sub-actors should be able to speak to - # their root actor over that channel. - if _state._runtime_vars['_is_root']: - raddrs: list[Address] = _state._runtime_vars['_root_addrs'] - for addr in accept_addrs: - waddr: Address = wrap_address(addr) - raddrs.append(addr) - else: - _state._runtime_vars['_root_mailbox'] = raddrs[0] - # Register with the arbiter if we're told its addr log.runtime( f'Registering `{actor.name}` => {pformat(accept_addrs)}\n' @@ -1475,6 +1580,7 @@ async def async_main( except AssertionError: await debug.pause() + # !TODO, get rid of the local-portal crap XD async with get_registry(addr) as reg_portal: for accept_addr in accept_addrs: accept_addr = wrap_address(accept_addr) @@ -1511,8 +1617,9 @@ async def async_main( # 'Blocking on service nursery to exit..\n' ) log.runtime( - "Service nursery complete\n" - "Waiting on root nursery to complete" + 'Service nursery complete\n' + '\n' + '->} waiting on root nursery to complete..\n' ) # Blocks here as expected until the root nursery is @@ -1567,6 +1674,7 @@ async def async_main( finally: teardown_report: str = ( 'Main actor-runtime task completed\n' + '\n' ) # ?TODO? should this be in `._entry`/`._root` mods instead? @@ -1608,7 +1716,8 @@ async def async_main( # Unregister actor from the registry-sys / registrar. if ( is_registered - and not actor.is_registrar + and + not actor.is_registrar ): failed: bool = False for addr in actor.reg_addrs: @@ -1643,7 +1752,8 @@ async def async_main( ipc_server.has_peers(check_chans=True) ): teardown_report += ( - f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n' + f'-> Waiting for remaining peers to clear..\n' + f' {pformat(ipc_server._peers)}' ) log.runtime(teardown_report) await ipc_server.wait_for_no_more_peers( @@ -1651,15 +1761,23 @@ async def async_main( ) teardown_report += ( - '-> All peer channels are complete\n' + '-]> all peer channels are complete.\n' ) + # op_nested_actor_repr: str = _pformat.nest_from_op( + # input_op=')>', + # text=actor.pformat(), + # nest_prefix='|_', + # nest_indent=1, # under > + # ) teardown_report += ( - 'Actor runtime exiting\n' - f'>)\n' - f'|_{actor}\n' + '-)> actor runtime main task exit.\n' + # f'{op_nested_actor_repr}' ) - log.info(teardown_report) + # if _state._runtime_vars['_is_root']: + # log.info(teardown_report) + # else: + log.runtime(teardown_report) # TODO: rename to `Registry` and move to `.discovery._registry`! diff --git a/tractor/_spawn.py b/tractor/_spawn.py index a3e3194e..408e793c 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -34,9 +34,9 @@ from typing import ( import trio from trio import TaskStatus -from .devx.debug import ( - maybe_wait_for_debugger, - acquire_debug_lock, +from .devx import ( + debug, + pformat as _pformat ) from tractor._state import ( current_actor, @@ -51,14 +51,17 @@ from tractor._portal import Portal from tractor._runtime import Actor from tractor._entry import _mp_main from tractor._exceptions import ActorFailure -from tractor.msg.types import ( - Aid, - SpawnSpec, +from tractor.msg import ( + types as msgtypes, + pretty_struct, ) if TYPE_CHECKING: - from ipc import IPCServer + from ipc import ( + _server, + Channel, + ) from ._supervise import ActorNursery ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) @@ -233,10 +236,6 @@ async def hard_kill( # whilst also hacking on it XD # terminate_after: int = 99999, - # NOTE: for mucking with `.pause()`-ing inside the runtime - # whilst also hacking on it XD - # terminate_after: int = 99999, - ) -> None: ''' Un-gracefully terminate an OS level `trio.Process` after timeout. @@ -328,20 +327,21 @@ async def soft_kill( see `.hard_kill()`). ''' - peer_aid: Aid = portal.channel.aid + chan: Channel = portal.channel + peer_aid: msgtypes.Aid = chan.aid try: log.cancel( f'Soft killing sub-actor via portal request\n' f'\n' - f'(c=> {peer_aid}\n' - f' |_{proc}\n' + f'c)=> {peer_aid.reprol()}@[{chan.maddr}]\n' + f' |_{proc}\n' ) # wait on sub-proc to signal termination await wait_func(proc) except trio.Cancelled: with trio.CancelScope(shield=True): - await maybe_wait_for_debugger( + await debug.maybe_wait_for_debugger( child_in_debug=_runtime_vars.get( '_debug_mode', False ), @@ -465,7 +465,7 @@ async def trio_proc( "--uid", # TODO, how to pass this over "wire" encodings like # cmdline args? - # -[ ] maybe we can add an `Aid.min_tuple()` ? + # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ? str(subactor.uid), # Address the child must connect to on startup "--parent_addr", @@ -483,13 +483,14 @@ async def trio_proc( cancelled_during_spawn: bool = False proc: trio.Process|None = None - ipc_server: IPCServer = actor_nursery._actor.ipc_server + ipc_server: _server.Server = actor_nursery._actor.ipc_server try: try: proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs) log.runtime( - 'Started new child\n' - f'|_{proc}\n' + f'Started new child subproc\n' + f'(>\n' + f' |_{proc}\n' ) # wait for actor to spawn and connect back to us @@ -507,10 +508,10 @@ async def trio_proc( with trio.CancelScope(shield=True): # don't clobber an ongoing pdb if is_root_process(): - await maybe_wait_for_debugger() + await debug.maybe_wait_for_debugger() elif proc is not None: - async with acquire_debug_lock(subactor.uid): + async with debug.acquire_debug_lock(subactor.uid): # soft wait on the proc to terminate with trio.move_on_after(0.5): await proc.wait() @@ -528,14 +529,19 @@ async def trio_proc( # send a "spawning specification" which configures the # initial runtime state of the child. - sspec = SpawnSpec( + sspec = msgtypes.SpawnSpec( _parent_main_data=subactor._parent_main_data, enable_modules=subactor.enable_modules, reg_addrs=subactor.reg_addrs, bind_addrs=bind_addrs, _runtime_vars=_runtime_vars, ) - log.runtime(f'Sending spawn spec: {str(sspec)}') + log.runtime( + f'Sending spawn spec to child\n' + f'{{}}=> {chan.aid.reprol()!r}\n' + f'\n' + f'{pretty_struct.pformat(sspec)}\n' + ) await chan.send(sspec) # track subactor in current nursery @@ -563,7 +569,7 @@ async def trio_proc( # condition. await soft_kill( proc, - trio.Process.wait, + trio.Process.wait, # XXX, uses `pidfd_open()` below. portal ) @@ -571,8 +577,7 @@ async def trio_proc( # tandem if not done already log.cancel( 'Cancelling portal result reaper task\n' - f'>c)\n' - f' |_{subactor.uid}\n' + f'c)> {subactor.aid.reprol()!r}\n' ) nursery.cancel_scope.cancel() @@ -581,21 +586,24 @@ async def trio_proc( # allowed! Do this **after** cancellation/teardown to avoid # killing the process too early. if proc: + reap_repr: str = _pformat.nest_from_op( + input_op='>x)', + text=subactor.pformat(), + ) log.cancel( f'Hard reap sequence starting for subactor\n' - f'>x)\n' - f' |_{subactor}@{subactor.uid}\n' + f'{reap_repr}' ) with trio.CancelScope(shield=True): # don't clobber an ongoing pdb if cancelled_during_spawn: # Try again to avoid TTY clobbering. - async with acquire_debug_lock(subactor.uid): + async with debug.acquire_debug_lock(subactor.uid): with trio.move_on_after(0.5): await proc.wait() - await maybe_wait_for_debugger( + await debug.maybe_wait_for_debugger( child_in_debug=_runtime_vars.get( '_debug_mode', False ), @@ -624,7 +632,7 @@ async def trio_proc( # acquire the lock and get notified of who has it, # check that uid against our known children? # this_uid: tuple[str, str] = current_actor().uid - # await acquire_debug_lock(this_uid) + # await debug.acquire_debug_lock(this_uid) if proc.poll() is None: log.cancel(f"Attempting to hard kill {proc}") @@ -727,7 +735,7 @@ async def mp_proc( log.runtime(f"Started {proc}") - ipc_server: IPCServer = actor_nursery._actor.ipc_server + ipc_server: _server.Server = actor_nursery._actor.ipc_server try: # wait for actor to spawn and connect back to us # channel should have handshake completed by the diff --git a/tractor/_supervise.py b/tractor/_supervise.py index e1775292..9fdad8ce 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -21,7 +21,6 @@ from contextlib import asynccontextmanager as acm from functools import partial import inspect -from pprint import pformat from typing import ( TYPE_CHECKING, ) @@ -31,7 +30,10 @@ import warnings import trio -from .devx.debug import maybe_wait_for_debugger +from .devx import ( + debug, + pformat as _pformat, +) from ._addr import ( UnwrappedAddress, mk_uuid, @@ -199,7 +201,7 @@ class ActorNursery: loglevel=loglevel, # verbatim relay this actor's registrar addresses - registry_addrs=current_actor().reg_addrs, + registry_addrs=current_actor().registry_addrs, ) parent_addr: UnwrappedAddress = self._actor.accept_addr assert parent_addr @@ -453,7 +455,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # the "hard join phase". log.runtime( 'Waiting on subactors to complete:\n' - f'{pformat(an._children)}\n' + f'>}} {len(an._children)}\n' ) an._join_procs.set() @@ -467,7 +469,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # will make the pdb repl unusable. # Instead try to wait for pdb to be released before # tearing down. - await maybe_wait_for_debugger( + await debug.maybe_wait_for_debugger( child_in_debug=an._at_least_one_child_in_debug ) @@ -543,7 +545,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # XXX: yet another guard before allowing the cancel # sequence in case a (single) child is in debug. - await maybe_wait_for_debugger( + await debug.maybe_wait_for_debugger( child_in_debug=an._at_least_one_child_in_debug ) @@ -592,6 +594,11 @@ async def _open_and_supervise_one_cancels_all_nursery( # final exit +_shutdown_msg: str = ( + 'Actor-runtime-shutdown' +) + + @acm # @api_frame async def open_nursery( @@ -679,17 +686,26 @@ async def open_nursery( ): __tracebackhide__: bool = False - msg: str = ( - 'Actor-nursery exited\n' - f'|_{an}\n' + + op_nested_an_repr: str = _pformat.nest_from_op( + input_op=')>', + text=f'{an}', + # nest_prefix='|_', + nest_indent=1, # under > ) + an_msg: str = ( + f'Actor-nursery exited\n' + f'{op_nested_an_repr}\n' + ) + # keep noise low during std operation. + log.runtime(an_msg) if implicit_runtime: # shutdown runtime if it was started and report noisly # that we're did so. - msg += '=> Shutting down actor runtime <=\n' + msg: str = ( + '\n' + '\n' + f'{_shutdown_msg} )>\n' + ) log.info(msg) - - else: - # keep noise low during std operation. - log.runtime(msg) diff --git a/tractor/devx/_stackscope.py b/tractor/devx/_stackscope.py index 84d8a67f..11d2a1ef 100644 --- a/tractor/devx/_stackscope.py +++ b/tractor/devx/_stackscope.py @@ -237,9 +237,9 @@ def enable_stack_on_sig( try: import stackscope except ImportError: - log.error( - '`stackscope` not installed for use in debug mode!\n' - '`Ignoring {enable_stack_on_sig!r} call!\n' + log.warning( + 'The `stackscope` lib is not installed!\n' + '`Ignoring enable_stack_on_sig() call!\n' ) return None diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 2c3374c2..64643d95 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -171,11 +171,23 @@ class Channel: ) assert transport.raddr == addr chan = Channel(transport=transport) - log.runtime( - f'Connected channel IPC transport\n' - f'[>\n' - f' |_{chan}\n' - ) + + # ?TODO, compact this into adapter level-methods? + # -[ ] would avoid extra repr-calcs if level not active? + # |_ how would the `calc_if_level` look though? func? + if log.at_least_level('runtime'): + from tractor.devx import ( + pformat as _pformat, + ) + chan_repr: str = _pformat.nest_from_op( + input_op='[>', + text=chan.pformat(), + nest_indent=1, + ) + log.runtime( + f'Connected channel IPC transport\n' + f'{chan_repr}' + ) return chan @cm @@ -196,9 +208,12 @@ class Channel: self._transport.codec = orig # TODO: do a .src/.dst: str for maddrs? - def pformat(self) -> str: + def pformat( + self, + privates: bool = False, + ) -> str: if not self._transport: - return '' + return '' tpt: MsgTransport = self._transport tpt_name: str = type(tpt).__name__ @@ -206,26 +221,35 @@ class Channel: 'connected' if self.connected() else 'closed' ) - return ( + repr_str: str = ( f'\n' + ) + ( f' |_msgstream: {tpt_name}\n' - f' proto={tpt.laddr.proto_key!r}\n' - f' layer={tpt.layer_key!r}\n' - f' laddr={tpt.laddr}\n' - f' raddr={tpt.raddr}\n' - f' codec={tpt.codec_key!r}\n' - f' stream={tpt.stream}\n' - f' maddr={tpt.maddr!r}\n' - f' drained={tpt.drained}\n' + f' maddr: {tpt.maddr!r}\n' + f' proto: {tpt.laddr.proto_key!r}\n' + f' layer: {tpt.layer_key!r}\n' + f' codec: {tpt.codec_key!r}\n' + f' .laddr={tpt.laddr}\n' + f' .raddr={tpt.raddr}\n' + ) + ( + f' ._transport.stream={tpt.stream}\n' + f' ._transport.drained={tpt.drained}\n' + if privates else '' + ) + ( f' _send_lock={tpt._send_lock.statistics()}\n' - f')>\n' + if privates else '' + ) + ( + ')>\n' ) + return repr_str # NOTE: making this return a value that can be passed to # `eval()` is entirely **optional** FYI! @@ -247,6 +271,10 @@ class Channel: def raddr(self) -> Address|None: return self._transport.raddr if self._transport else None + @property + def maddr(self) -> str: + return self._transport.maddr if self._transport else '' + # TODO: something like, # `pdbp.hideframe_on(errors=[MsgTypeError])` # instead of the `try/except` hack we have rn.. @@ -434,8 +462,8 @@ class Channel: await self.send(aid) peer_aid: Aid = await self.recv() log.runtime( - f'Received hanshake with peer actor,\n' - f'{peer_aid}\n' + f'Received hanshake with peer\n' + f'<= {peer_aid.reprol(sin_uuid=False)}\n' ) # NOTE, we always are referencing the remote peer! self.aid = peer_aid diff --git a/tractor/ipc/_server.py b/tractor/ipc/_server.py index a8732c10..e857db19 100644 --- a/tractor/ipc/_server.py +++ b/tractor/ipc/_server.py @@ -26,7 +26,7 @@ from contextlib import ( from functools import partial from itertools import chain import inspect -from pprint import pformat +import textwrap from types import ( ModuleType, ) @@ -43,7 +43,10 @@ from trio import ( SocketListener, ) -# from ..devx import debug +from ..devx.pformat import ( + ppfmt, + nest_from_op, +) from .._exceptions import ( TransportClosed, ) @@ -141,9 +144,8 @@ async def maybe_wait_on_canced_subs( ): log.cancel( - 'Waiting on cancel request to peer..\n' - f'c)=>\n' - f' |_{chan.aid}\n' + 'Waiting on cancel request to peer\n' + f'c)=> {chan.aid.reprol()}@[{chan.maddr}]\n' ) # XXX: this is a soft wait on the channel (and its @@ -179,7 +181,7 @@ async def maybe_wait_on_canced_subs( log.warning( 'Draining msg from disconnected peer\n' f'{chan_info}' - f'{pformat(msg)}\n' + f'{ppfmt(msg)}\n' ) # cid: str|None = msg.get('cid') cid: str|None = msg.cid @@ -248,7 +250,7 @@ async def maybe_wait_on_canced_subs( if children := local_nursery._children: # indent from above local-nurse repr report += ( - f' |_{pformat(children)}\n' + f' |_{ppfmt(children)}\n' ) log.warning(report) @@ -279,8 +281,9 @@ async def maybe_wait_on_canced_subs( log.runtime( f'Peer IPC broke but subproc is alive?\n\n' - f'<=x {chan.aid}@{chan.raddr}\n' - f' |_{proc}\n' + f'<=x {chan.aid.reprol()}@[{chan.maddr}]\n' + f'\n' + f'{proc}\n' ) return local_nursery @@ -324,9 +327,10 @@ async def handle_stream_from_peer( chan = Channel.from_stream(stream) con_status: str = ( - 'New inbound IPC connection <=\n' - f'|_{chan}\n' + f'New inbound IPC transport connection\n' + f'<=( {stream!r}\n' ) + con_status_steps: str = '' # initial handshake with peer phase try: @@ -372,7 +376,7 @@ async def handle_stream_from_peer( if _pre_chan := server._peers.get(uid): familiar: str = 'pre-existing-peer' uid_short: str = f'{uid[0]}[{uid[1][-6:]}]' - con_status += ( + con_status_steps += ( f' -> Handshake with {familiar} `{uid_short}` complete\n' ) @@ -397,7 +401,7 @@ async def handle_stream_from_peer( None, ) if event: - con_status += ( + con_status_steps += ( ' -> Waking subactor spawn waiters: ' f'{event.statistics().tasks_waiting}\n' f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n' @@ -408,7 +412,7 @@ async def handle_stream_from_peer( event.set() else: - con_status += ( + con_status_steps += ( f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n' ) # type: ignore @@ -422,8 +426,15 @@ async def handle_stream_from_peer( # TODO: can we just use list-ref directly? chans.append(chan) - con_status += ' -> Entering RPC msg loop..\n' - log.runtime(con_status) + con_status_steps += ' -> Entering RPC msg loop..\n' + log.runtime( + con_status + + + textwrap.indent( + con_status_steps, + prefix=' '*3, # align to first-ln + ) + ) # Begin channel management - respond to remote requests and # process received reponses. @@ -456,41 +467,67 @@ async def handle_stream_from_peer( disconnected=disconnected, ) - # ``Channel`` teardown and closure sequence + # `Channel` teardown and closure sequence # drop ref to channel so it can be gc-ed and disconnected - con_teardown_status: str = ( - f'IPC channel disconnected:\n' - f'<=x uid: {chan.aid}\n' - f' |_{pformat(chan)}\n\n' + # + # -[x]TODO mk this be like + # <=x Channel( + # |_field: blah + # )> + op_repr: str = '<=x ' + chan_repr: str = nest_from_op( + input_op=op_repr, + op_suffix='', + nest_prefix='', + text=chan.pformat(), + nest_indent=len(op_repr)-1, + rm_from_first_ln='<', ) + + con_teardown_status: str = ( + f'IPC channel disconnect\n' + f'\n' + f'{chan_repr}\n' + f'\n' + ) + chans.remove(chan) # TODO: do we need to be this pedantic? if not chans: con_teardown_status += ( - f'-> No more channels with {chan.aid}' + f'-> No more channels with {chan.aid.reprol()!r}\n' ) server._peers.pop(uid, None) - peers_str: str = '' - for uid, chans in server._peers.items(): - peers_str += ( - f'uid: {uid}\n' - ) - for i, chan in enumerate(chans): - peers_str += ( - f' |_[{i}] {pformat(chan)}\n' + if peers := list(server._peers.values()): + peer_cnt: int = len(peers) + if ( + (first := peers[0][0]) is not chan + and + not disconnected + and + peer_cnt > 1 + ): + con_teardown_status += ( + f'-> Remaining IPC {peer_cnt-1!r} peers:\n' ) - - con_teardown_status += ( - f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n' - ) + for chans in server._peers.values(): + first: Channel = chans[0] + if not ( + first is chan + and + disconnected + ): + con_teardown_status += ( + f' |_{first.aid.reprol()!r} -> {len(chans)!r} chans\n' + ) # No more channels to other actors (at all) registered # as connected. if not server._peers: con_teardown_status += ( - 'Signalling no more peer channel connections' + '-> Signalling no more peer connections!\n' ) server._no_more_peers.set() @@ -579,10 +616,10 @@ async def handle_stream_from_peer( class Endpoint(Struct): ''' - An instance of an IPC "bound" address where the lifetime of the - "ability to accept connections" (from clients) and then handle - those inbound sessions or sequences-of-packets is determined by - a (maybe pair of) nurser(y/ies). + An instance of an IPC "bound" address where the lifetime of an + "ability to accept connections" and handle the subsequent + sequence-of-packets (maybe oriented as sessions) is determined by + the underlying nursery scope(s). ''' addr: Address @@ -600,6 +637,24 @@ class Endpoint(Struct): MsgTransport, # handle to encoded-msg transport stream ] = {} + def pformat( + self, + indent: int = 0, + privates: bool = False, + ) -> str: + type_repr: str = type(self).__name__ + fmtstr: str = ( + # !TODO, always be ns aware! + # f'|_netns: {netns}\n' + f' |.addr: {self.addr!r}\n' + f' |_peers: {len(self.peer_tpts)}\n' + ) + return ( + f'<{type_repr}(\n' + f'{fmtstr}' + f')>' + ) + async def start_listener(self) -> SocketListener: tpt_mod: ModuleType = inspect.getmodule(self.addr) lstnr: SocketListener = await tpt_mod.start_listener( @@ -639,11 +694,13 @@ class Endpoint(Struct): class Server(Struct): _parent_tn: Nursery _stream_handler_tn: Nursery + # level-triggered sig for whether "no peers are currently # connected"; field is **always** set to an instance but # initialized with `.is_set() == True`. _no_more_peers: trio.Event + # active eps as allocated by `.listen_on()` _endpoints: list[Endpoint] = [] # connection tracking & mgmt @@ -651,12 +708,19 @@ class Server(Struct): str, # uaid list[Channel], # IPC conns from peer ] = defaultdict(list) + + # events-table with entries registered unset while the local + # actor is waiting on a new actor to inbound connect, often + # a parent waiting on its child just after spawn. _peer_connected: dict[ tuple[str, str], trio.Event, ] = {} # syncs for setup/teardown sequences + # - null when not yet booted, + # - unset when active, + # - set when fully shutdown with 0 eps active. _shutdown: trio.Event|None = None # TODO, maybe just make `._endpoints: list[Endpoint]` and @@ -664,7 +728,6 @@ class Server(Struct): # @property # def addrs2eps(self) -> dict[Address, Endpoint]: # ... - @property def proto_keys(self) -> list[str]: return [ @@ -690,7 +753,7 @@ class Server(Struct): # TODO: obvi a different server type when we eventually # support some others XD log.runtime( - f'Cancelling server(s) for\n' + f'Cancelling server(s) for tpt-protos\n' f'{self.proto_keys!r}\n' ) self._parent_tn.cancel_scope.cancel() @@ -717,6 +780,14 @@ class Server(Struct): f'protos: {tpt_protos!r}\n' ) + def len_peers( + self, + ) -> int: + return len([ + chan.connected() + for chan in chain(*self._peers.values()) + ]) + def has_peers( self, check_chans: bool = False, @@ -730,13 +801,11 @@ class Server(Struct): has_peers and check_chans + and + (peer_cnt := self.len_peers()) ): has_peers: bool = ( - any(chan.connected() - for chan in chain( - *self._peers.values() - ) - ) + peer_cnt > 0 and has_peers ) @@ -803,30 +872,66 @@ class Server(Struct): return ev.is_set() - def pformat(self) -> str: + @property + def repr_state(self) -> str: + ''' + A `str`-status describing the current state of this + IPC server in terms of the current operating "phase". + + ''' + status = 'server is active' + if self.has_peers(): + peer_cnt: int = self.len_peers() + status: str = ( + f'{peer_cnt!r} peer chans' + ) + else: + status: str = 'No peer chans' + + if self.is_shutdown(): + status: str = 'server-shutdown' + + return status + + def pformat( + self, + privates: bool = False, + ) -> str: eps: list[Endpoint] = self._endpoints - state_repr: str = ( - f'{len(eps)!r} IPC-endpoints active' - ) + # state_repr: str = ( + # f'{len(eps)!r} endpoints active' + # ) fmtstr = ( - f' |_state: {state_repr}\n' - f' no_more_peers: {self.has_peers()}\n' + f' |_state: {self.repr_state!r}\n' ) - if self._shutdown is not None: - shutdown_stats: EventStatistics = self._shutdown.statistics() + if privates: + fmtstr += f' no_more_peers: {self.has_peers()}\n' + + if self._shutdown is not None: + shutdown_stats: EventStatistics = self._shutdown.statistics() + fmtstr += ( + f' task_waiting_on_shutdown: {shutdown_stats}\n' + ) + + if eps := self._endpoints: + addrs: list[tuple] = [ + ep.addr for ep in eps + ] + repr_eps: str = ppfmt(addrs) + fmtstr += ( - f' task_waiting_on_shutdown: {shutdown_stats}\n' + f' |_endpoints: {repr_eps}\n' + # ^TODO? how to indent closing ']'.. ) - fmtstr += ( - # TODO, use the `ppfmt()` helper from `modden`! - f' |_endpoints: {pformat(self._endpoints)}\n' - f' |_peers: {len(self._peers)} connected\n' - ) + if peers := self._peers: + fmtstr += ( + f' |_peers: {len(peers)} connected\n' + ) return ( - f'\n' ) @@ -885,8 +990,8 @@ class Server(Struct): ) log.runtime( - f'Binding to endpoints for,\n' - f'{accept_addrs}\n' + f'Binding endpoints\n' + f'{ppfmt(accept_addrs)}\n' ) eps: list[Endpoint] = await self._parent_tn.start( partial( @@ -896,13 +1001,19 @@ class Server(Struct): listen_addrs=accept_addrs, ) ) + self._endpoints.extend(eps) + + serv_repr: str = nest_from_op( + input_op='(>', + text=self.pformat(), + nest_indent=1, + ) log.runtime( - f'Started IPC endpoints\n' - f'{eps}\n' + f'Started IPC server\n' + f'{serv_repr}' ) - self._endpoints.extend(eps) - # XXX, just a little bit of sanity + # XXX, a little sanity on new ep allocations group_tn: Nursery|None = None ep: Endpoint for ep in eps: @@ -956,9 +1067,13 @@ async def _serve_ipc_eps( stream_handler_tn=stream_handler_tn, ) try: + ep_sclang: str = nest_from_op( + input_op='>[', + text=f'{ep.pformat()}', + ) log.runtime( f'Starting new endpoint listener\n' - f'{ep}\n' + f'{ep_sclang}\n' ) listener: trio.abc.Listener = await ep.start_listener() assert listener is ep._listener @@ -996,17 +1111,6 @@ async def _serve_ipc_eps( handler_nursery=stream_handler_tn ) ) - # TODO, wow make this message better! XD - log.runtime( - 'Started server(s)\n' - + - '\n'.join([f'|_{addr}' for addr in listen_addrs]) - ) - - log.runtime( - f'Started IPC endpoints\n' - f'{eps}\n' - ) task_status.started( eps, ) @@ -1049,8 +1153,7 @@ async def open_ipc_server( try: yield ipc_server log.runtime( - f'Waiting on server to shutdown or be cancelled..\n' - f'{ipc_server}' + 'Server-tn running until terminated\n' ) # TODO? when if ever would we want/need this? # with trio.CancelScope(shield=True): diff --git a/tractor/ipc/_tcp.py b/tractor/ipc/_tcp.py index e945cdfb..a1f511d5 100644 --- a/tractor/ipc/_tcp.py +++ b/tractor/ipc/_tcp.py @@ -160,10 +160,9 @@ async def start_listener( Start a TCP socket listener on the given `TCPAddress`. ''' - log.info( - f'Attempting to bind TCP socket\n' - f'>[\n' - f'|_{addr}\n' + log.runtime( + f'Trying socket bind\n' + f'>[ {addr}\n' ) # ?TODO, maybe we should just change the lower-level call this is # using internall per-listener? @@ -178,11 +177,10 @@ async def start_listener( assert len(listeners) == 1 listener = listeners[0] host, port = listener.socket.getsockname()[:2] - + bound_addr: TCPAddress = type(addr).from_addr((host, port)) log.info( f'Listening on TCP socket\n' - f'[>\n' - f' |_{addr}\n' + f'[> {bound_addr}\n' ) return listener diff --git a/tractor/log.py b/tractor/log.py index 393c9571..329562b1 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -81,10 +81,35 @@ BOLD_PALETTE = { } +def at_least_level( + log: Logger|LoggerAdapter, + level: int|str, +) -> bool: + ''' + Predicate to test if a given level is active. + + ''' + if isinstance(level, str): + level: int = CUSTOM_LEVELS[level.upper()] + + if log.getEffectiveLevel() <= level: + return True + return False + + # TODO: this isn't showing the correct '{filename}' # as it did before.. class StackLevelAdapter(LoggerAdapter): + def at_least_level( + self, + level: str, + ) -> bool: + return at_least_level( + log=self, + level=level, + ) + def transport( self, msg: str, @@ -401,19 +426,3 @@ def get_loglevel() -> str: # global module logger for tractor itself log: StackLevelAdapter = get_logger('tractor') - - -def at_least_level( - log: Logger|LoggerAdapter, - level: int|str, -) -> bool: - ''' - Predicate to test if a given level is active. - - ''' - if isinstance(level, str): - level: int = CUSTOM_LEVELS[level.upper()] - - if log.getEffectiveLevel() <= level: - return True - return False diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 9a9c9914..1dad63c8 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -210,12 +210,14 @@ class PldRx(Struct): match msg: case Return()|Error(): log.runtime( - f'Rxed final outcome msg\n' + f'Rxed final-outcome msg\n' + f'\n' f'{msg}\n' ) case Stop(): log.runtime( f'Rxed stream stopped msg\n' + f'\n' f'{msg}\n' ) if passthrough_non_pld_msgs: @@ -261,8 +263,9 @@ class PldRx(Struct): if ( type(msg) is Return ): - log.info( + log.runtime( f'Rxed final result msg\n' + f'\n' f'{msg}\n' ) return self.decode_pld( @@ -304,10 +307,13 @@ class PldRx(Struct): try: pld: PayloadT = self._pld_dec.decode(pld) log.runtime( - 'Decoded msg payload\n\n' + f'Decoded payload for\n' + # f'\n' f'{msg}\n' - f'where payload decoded as\n' - f'|_pld={pld!r}\n' + # ^TODO?, ideally just render with `, + # pld={decode}` in the `msg.pformat()`?? + f'where, ' + f'{type(msg).__name__}.pld={pld!r}\n' ) return pld except TypeError as typerr: @@ -494,7 +500,8 @@ def limit_plds( finally: log.runtime( - 'Reverted to previous payload-decoder\n\n' + f'Reverted to previous payload-decoder\n' + f'\n' f'{orig_pldec}\n' ) # sanity on orig settings @@ -629,7 +636,8 @@ async def drain_to_final_msg( (local_cs := rent_n.cancel_scope).cancel_called ): log.cancel( - 'RPC-ctx cancelled by local-parent scope during drain!\n\n' + f'RPC-ctx cancelled by local-parent scope during drain!\n' + f'\n' f'c}}>\n' f' |_{rent_n}\n' f' |_.cancel_scope = {local_cs}\n' @@ -663,7 +671,8 @@ async def drain_to_final_msg( # final result arrived! case Return(): log.runtime( - 'Context delivered final draining msg:\n' + f'Context delivered final draining msg\n' + f'\n' f'{pretty_struct.pformat(msg)}' ) ctx._result: Any = pld @@ -697,12 +706,14 @@ async def drain_to_final_msg( ): log.cancel( 'Cancelling `MsgStream` drain since ' - f'{reason}\n\n' + f'{reason}\n' + f'\n' f'<= {ctx.chan.uid}\n' - f' |_{ctx._nsf}()\n\n' + f' |_{ctx._nsf}()\n' + f'\n' f'=> {ctx._task}\n' - f' |_{ctx._stream}\n\n' - + f' |_{ctx._stream}\n' + f'\n' f'{pretty_struct.pformat(msg)}\n' ) break @@ -739,7 +750,8 @@ async def drain_to_final_msg( case Stop(): pre_result_drained.append(msg) log.runtime( # normal/expected shutdown transaction - 'Remote stream terminated due to "stop" msg:\n\n' + f'Remote stream terminated due to "stop" msg\n' + f'\n' f'{pretty_struct.pformat(msg)}\n' ) continue @@ -814,7 +826,8 @@ async def drain_to_final_msg( else: log.cancel( - 'Skipping `MsgStream` drain since final outcome is set\n\n' + f'Skipping `MsgStream` drain since final outcome is set\n' + f'\n' f'{ctx.outcome}\n' ) diff --git a/tractor/msg/types.py b/tractor/msg/types.py index aaf8d137..17d99449 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -154,6 +154,39 @@ class Aid( # should also include at least `.pid` (equiv to port for tcp) # and/or host-part always? + @property + def uid(self) -> tuple[str, str]: + ''' + Legacy actor "unique-id" pair format. + + ''' + return ( + self.name, + self.uuid, + ) + + def reprol( + self, + sin_uuid: bool = True, + ) -> str: + if not sin_uuid: + return ( + f'{self.name}[{self.uuid[:6]}]@{self.pid!r}' + ) + return ( + f'{self.name}@{self.pid!r}' + ) + + # mk hashable via `.uuid` + def __hash__(self) -> int: + return hash(self.uuid) + + def __eq__(self, other: Aid) -> bool: + return self.uuid == other.uuid + + # use pretty fmt since often repr-ed for console/log + __repr__ = pretty_struct.Struct.__repr__ + class SpawnSpec( pretty_struct.Struct,