diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 0f36b056..64643d95 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -462,8 +462,8 @@ class Channel: await self.send(aid) peer_aid: Aid = await self.recv() log.runtime( - f'Received hanshake with peer ' - f'{peer_aid.reprol(sin_uuid=False)}\n' + f'Received hanshake with peer\n' + f'<= {peer_aid.reprol(sin_uuid=False)}\n' ) # NOTE, we always are referencing the remote peer! self.aid = peer_aid diff --git a/tractor/ipc/_server.py b/tractor/ipc/_server.py index a8732c10..e857db19 100644 --- a/tractor/ipc/_server.py +++ b/tractor/ipc/_server.py @@ -26,7 +26,7 @@ from contextlib import ( from functools import partial from itertools import chain import inspect -from pprint import pformat +import textwrap from types import ( ModuleType, ) @@ -43,7 +43,10 @@ from trio import ( SocketListener, ) -# from ..devx import debug +from ..devx.pformat import ( + ppfmt, + nest_from_op, +) from .._exceptions import ( TransportClosed, ) @@ -141,9 +144,8 @@ async def maybe_wait_on_canced_subs( ): log.cancel( - 'Waiting on cancel request to peer..\n' - f'c)=>\n' - f' |_{chan.aid}\n' + 'Waiting on cancel request to peer\n' + f'c)=> {chan.aid.reprol()}@[{chan.maddr}]\n' ) # XXX: this is a soft wait on the channel (and its @@ -179,7 +181,7 @@ async def maybe_wait_on_canced_subs( log.warning( 'Draining msg from disconnected peer\n' f'{chan_info}' - f'{pformat(msg)}\n' + f'{ppfmt(msg)}\n' ) # cid: str|None = msg.get('cid') cid: str|None = msg.cid @@ -248,7 +250,7 @@ async def maybe_wait_on_canced_subs( if children := local_nursery._children: # indent from above local-nurse repr report += ( - f' |_{pformat(children)}\n' + f' |_{ppfmt(children)}\n' ) log.warning(report) @@ -279,8 +281,9 @@ async def maybe_wait_on_canced_subs( log.runtime( f'Peer IPC broke but subproc is alive?\n\n' - f'<=x {chan.aid}@{chan.raddr}\n' - f' |_{proc}\n' + f'<=x {chan.aid.reprol()}@[{chan.maddr}]\n' + f'\n' + f'{proc}\n' ) return local_nursery @@ -324,9 +327,10 @@ async def handle_stream_from_peer( chan = Channel.from_stream(stream) con_status: str = ( - 'New inbound IPC connection <=\n' - f'|_{chan}\n' + f'New inbound IPC transport connection\n' + f'<=( {stream!r}\n' ) + con_status_steps: str = '' # initial handshake with peer phase try: @@ -372,7 +376,7 @@ async def handle_stream_from_peer( if _pre_chan := server._peers.get(uid): familiar: str = 'pre-existing-peer' uid_short: str = f'{uid[0]}[{uid[1][-6:]}]' - con_status += ( + con_status_steps += ( f' -> Handshake with {familiar} `{uid_short}` complete\n' ) @@ -397,7 +401,7 @@ async def handle_stream_from_peer( None, ) if event: - con_status += ( + con_status_steps += ( ' -> Waking subactor spawn waiters: ' f'{event.statistics().tasks_waiting}\n' f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n' @@ -408,7 +412,7 @@ async def handle_stream_from_peer( event.set() else: - con_status += ( + con_status_steps += ( f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n' ) # type: ignore @@ -422,8 +426,15 @@ async def handle_stream_from_peer( # TODO: can we just use list-ref directly? chans.append(chan) - con_status += ' -> Entering RPC msg loop..\n' - log.runtime(con_status) + con_status_steps += ' -> Entering RPC msg loop..\n' + log.runtime( + con_status + + + textwrap.indent( + con_status_steps, + prefix=' '*3, # align to first-ln + ) + ) # Begin channel management - respond to remote requests and # process received reponses. @@ -456,41 +467,67 @@ async def handle_stream_from_peer( disconnected=disconnected, ) - # ``Channel`` teardown and closure sequence + # `Channel` teardown and closure sequence # drop ref to channel so it can be gc-ed and disconnected - con_teardown_status: str = ( - f'IPC channel disconnected:\n' - f'<=x uid: {chan.aid}\n' - f' |_{pformat(chan)}\n\n' + # + # -[x]TODO mk this be like + # <=x Channel( + # |_field: blah + # )> + op_repr: str = '<=x ' + chan_repr: str = nest_from_op( + input_op=op_repr, + op_suffix='', + nest_prefix='', + text=chan.pformat(), + nest_indent=len(op_repr)-1, + rm_from_first_ln='<', ) + + con_teardown_status: str = ( + f'IPC channel disconnect\n' + f'\n' + f'{chan_repr}\n' + f'\n' + ) + chans.remove(chan) # TODO: do we need to be this pedantic? if not chans: con_teardown_status += ( - f'-> No more channels with {chan.aid}' + f'-> No more channels with {chan.aid.reprol()!r}\n' ) server._peers.pop(uid, None) - peers_str: str = '' - for uid, chans in server._peers.items(): - peers_str += ( - f'uid: {uid}\n' - ) - for i, chan in enumerate(chans): - peers_str += ( - f' |_[{i}] {pformat(chan)}\n' + if peers := list(server._peers.values()): + peer_cnt: int = len(peers) + if ( + (first := peers[0][0]) is not chan + and + not disconnected + and + peer_cnt > 1 + ): + con_teardown_status += ( + f'-> Remaining IPC {peer_cnt-1!r} peers:\n' ) - - con_teardown_status += ( - f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n' - ) + for chans in server._peers.values(): + first: Channel = chans[0] + if not ( + first is chan + and + disconnected + ): + con_teardown_status += ( + f' |_{first.aid.reprol()!r} -> {len(chans)!r} chans\n' + ) # No more channels to other actors (at all) registered # as connected. if not server._peers: con_teardown_status += ( - 'Signalling no more peer channel connections' + '-> Signalling no more peer connections!\n' ) server._no_more_peers.set() @@ -579,10 +616,10 @@ async def handle_stream_from_peer( class Endpoint(Struct): ''' - An instance of an IPC "bound" address where the lifetime of the - "ability to accept connections" (from clients) and then handle - those inbound sessions or sequences-of-packets is determined by - a (maybe pair of) nurser(y/ies). + An instance of an IPC "bound" address where the lifetime of an + "ability to accept connections" and handle the subsequent + sequence-of-packets (maybe oriented as sessions) is determined by + the underlying nursery scope(s). ''' addr: Address @@ -600,6 +637,24 @@ class Endpoint(Struct): MsgTransport, # handle to encoded-msg transport stream ] = {} + def pformat( + self, + indent: int = 0, + privates: bool = False, + ) -> str: + type_repr: str = type(self).__name__ + fmtstr: str = ( + # !TODO, always be ns aware! + # f'|_netns: {netns}\n' + f' |.addr: {self.addr!r}\n' + f' |_peers: {len(self.peer_tpts)}\n' + ) + return ( + f'<{type_repr}(\n' + f'{fmtstr}' + f')>' + ) + async def start_listener(self) -> SocketListener: tpt_mod: ModuleType = inspect.getmodule(self.addr) lstnr: SocketListener = await tpt_mod.start_listener( @@ -639,11 +694,13 @@ class Endpoint(Struct): class Server(Struct): _parent_tn: Nursery _stream_handler_tn: Nursery + # level-triggered sig for whether "no peers are currently # connected"; field is **always** set to an instance but # initialized with `.is_set() == True`. _no_more_peers: trio.Event + # active eps as allocated by `.listen_on()` _endpoints: list[Endpoint] = [] # connection tracking & mgmt @@ -651,12 +708,19 @@ class Server(Struct): str, # uaid list[Channel], # IPC conns from peer ] = defaultdict(list) + + # events-table with entries registered unset while the local + # actor is waiting on a new actor to inbound connect, often + # a parent waiting on its child just after spawn. _peer_connected: dict[ tuple[str, str], trio.Event, ] = {} # syncs for setup/teardown sequences + # - null when not yet booted, + # - unset when active, + # - set when fully shutdown with 0 eps active. _shutdown: trio.Event|None = None # TODO, maybe just make `._endpoints: list[Endpoint]` and @@ -664,7 +728,6 @@ class Server(Struct): # @property # def addrs2eps(self) -> dict[Address, Endpoint]: # ... - @property def proto_keys(self) -> list[str]: return [ @@ -690,7 +753,7 @@ class Server(Struct): # TODO: obvi a different server type when we eventually # support some others XD log.runtime( - f'Cancelling server(s) for\n' + f'Cancelling server(s) for tpt-protos\n' f'{self.proto_keys!r}\n' ) self._parent_tn.cancel_scope.cancel() @@ -717,6 +780,14 @@ class Server(Struct): f'protos: {tpt_protos!r}\n' ) + def len_peers( + self, + ) -> int: + return len([ + chan.connected() + for chan in chain(*self._peers.values()) + ]) + def has_peers( self, check_chans: bool = False, @@ -730,13 +801,11 @@ class Server(Struct): has_peers and check_chans + and + (peer_cnt := self.len_peers()) ): has_peers: bool = ( - any(chan.connected() - for chan in chain( - *self._peers.values() - ) - ) + peer_cnt > 0 and has_peers ) @@ -803,30 +872,66 @@ class Server(Struct): return ev.is_set() - def pformat(self) -> str: + @property + def repr_state(self) -> str: + ''' + A `str`-status describing the current state of this + IPC server in terms of the current operating "phase". + + ''' + status = 'server is active' + if self.has_peers(): + peer_cnt: int = self.len_peers() + status: str = ( + f'{peer_cnt!r} peer chans' + ) + else: + status: str = 'No peer chans' + + if self.is_shutdown(): + status: str = 'server-shutdown' + + return status + + def pformat( + self, + privates: bool = False, + ) -> str: eps: list[Endpoint] = self._endpoints - state_repr: str = ( - f'{len(eps)!r} IPC-endpoints active' - ) + # state_repr: str = ( + # f'{len(eps)!r} endpoints active' + # ) fmtstr = ( - f' |_state: {state_repr}\n' - f' no_more_peers: {self.has_peers()}\n' + f' |_state: {self.repr_state!r}\n' ) - if self._shutdown is not None: - shutdown_stats: EventStatistics = self._shutdown.statistics() + if privates: + fmtstr += f' no_more_peers: {self.has_peers()}\n' + + if self._shutdown is not None: + shutdown_stats: EventStatistics = self._shutdown.statistics() + fmtstr += ( + f' task_waiting_on_shutdown: {shutdown_stats}\n' + ) + + if eps := self._endpoints: + addrs: list[tuple] = [ + ep.addr for ep in eps + ] + repr_eps: str = ppfmt(addrs) + fmtstr += ( - f' task_waiting_on_shutdown: {shutdown_stats}\n' + f' |_endpoints: {repr_eps}\n' + # ^TODO? how to indent closing ']'.. ) - fmtstr += ( - # TODO, use the `ppfmt()` helper from `modden`! - f' |_endpoints: {pformat(self._endpoints)}\n' - f' |_peers: {len(self._peers)} connected\n' - ) + if peers := self._peers: + fmtstr += ( + f' |_peers: {len(peers)} connected\n' + ) return ( - f'\n' ) @@ -885,8 +990,8 @@ class Server(Struct): ) log.runtime( - f'Binding to endpoints for,\n' - f'{accept_addrs}\n' + f'Binding endpoints\n' + f'{ppfmt(accept_addrs)}\n' ) eps: list[Endpoint] = await self._parent_tn.start( partial( @@ -896,13 +1001,19 @@ class Server(Struct): listen_addrs=accept_addrs, ) ) + self._endpoints.extend(eps) + + serv_repr: str = nest_from_op( + input_op='(>', + text=self.pformat(), + nest_indent=1, + ) log.runtime( - f'Started IPC endpoints\n' - f'{eps}\n' + f'Started IPC server\n' + f'{serv_repr}' ) - self._endpoints.extend(eps) - # XXX, just a little bit of sanity + # XXX, a little sanity on new ep allocations group_tn: Nursery|None = None ep: Endpoint for ep in eps: @@ -956,9 +1067,13 @@ async def _serve_ipc_eps( stream_handler_tn=stream_handler_tn, ) try: + ep_sclang: str = nest_from_op( + input_op='>[', + text=f'{ep.pformat()}', + ) log.runtime( f'Starting new endpoint listener\n' - f'{ep}\n' + f'{ep_sclang}\n' ) listener: trio.abc.Listener = await ep.start_listener() assert listener is ep._listener @@ -996,17 +1111,6 @@ async def _serve_ipc_eps( handler_nursery=stream_handler_tn ) ) - # TODO, wow make this message better! XD - log.runtime( - 'Started server(s)\n' - + - '\n'.join([f'|_{addr}' for addr in listen_addrs]) - ) - - log.runtime( - f'Started IPC endpoints\n' - f'{eps}\n' - ) task_status.started( eps, ) @@ -1049,8 +1153,7 @@ async def open_ipc_server( try: yield ipc_server log.runtime( - f'Waiting on server to shutdown or be cancelled..\n' - f'{ipc_server}' + 'Server-tn running until terminated\n' ) # TODO? when if ever would we want/need this? # with trio.CancelScope(shield=True): diff --git a/tractor/ipc/_tcp.py b/tractor/ipc/_tcp.py index 4de04c25..8e0e9823 100644 --- a/tractor/ipc/_tcp.py +++ b/tractor/ipc/_tcp.py @@ -127,10 +127,9 @@ async def start_listener( Start a TCP socket listener on the given `TCPAddress`. ''' - log.info( - f'Attempting to bind TCP socket\n' - f'>[\n' - f'|_{addr}\n' + log.runtime( + f'Trying socket bind\n' + f'>[ {addr}\n' ) # ?TODO, maybe we should just change the lower-level call this is # using internall per-listener? @@ -145,11 +144,10 @@ async def start_listener( assert len(listeners) == 1 listener = listeners[0] host, port = listener.socket.getsockname()[:2] - + bound_addr: TCPAddress = type(addr).from_addr((host, port)) log.info( f'Listening on TCP socket\n' - f'[>\n' - f' |_{addr}\n' + f'[> {bound_addr}\n' ) return listener