Even more `.ipc.*` repr refinements
Mostly adjusting indentation, noise level, and clarity via `.pformat()` tweaks more general use of `.devx.pformat.nest_from_op()`. Specific impl deats, - use `pformat.ppfmt()/`nest_from_op()` more seriously throughout `._server`. - add a `._server.Endpoint.pformat()`. - add `._server.Server.len_peers()` and `.repr_state()`. - polish `Server.pformat()`. - drop some redundant `log.runtime()`s from `._serve_ipc_eps()` instead leaving-them-only/putting-them in the caller pub meth. - `._tcp.start_listener()` log the bound addr, not the input (which may be the 0-port.to_asyncio_eoc_signal
							parent
							
								
									7f451409ec
								
							
						
					
					
						commit
						82c12253e5
					
				|  | @ -462,8 +462,8 @@ class Channel: | |||
|         await self.send(aid) | ||||
|         peer_aid: Aid = await self.recv() | ||||
|         log.runtime( | ||||
|             f'Received hanshake with peer ' | ||||
|             f'{peer_aid.reprol(sin_uuid=False)}\n' | ||||
|             f'Received hanshake with peer\n' | ||||
|             f'<= {peer_aid.reprol(sin_uuid=False)}\n' | ||||
|         ) | ||||
|         # NOTE, we always are referencing the remote peer! | ||||
|         self.aid = peer_aid | ||||
|  |  | |||
|  | @ -26,7 +26,7 @@ from contextlib import ( | |||
| from functools import partial | ||||
| from itertools import chain | ||||
| import inspect | ||||
| from pprint import pformat | ||||
| import textwrap | ||||
| from types import ( | ||||
|     ModuleType, | ||||
| ) | ||||
|  | @ -43,7 +43,10 @@ from trio import ( | |||
|     SocketListener, | ||||
| ) | ||||
| 
 | ||||
| # from ..devx import debug | ||||
| from ..devx.pformat import ( | ||||
|     ppfmt, | ||||
|     nest_from_op, | ||||
| ) | ||||
| from .._exceptions import ( | ||||
|     TransportClosed, | ||||
| ) | ||||
|  | @ -141,9 +144,8 @@ async def maybe_wait_on_canced_subs( | |||
| 
 | ||||
|     ): | ||||
|         log.cancel( | ||||
|             'Waiting on cancel request to peer..\n' | ||||
|             f'c)=>\n' | ||||
|             f'  |_{chan.aid}\n' | ||||
|             'Waiting on cancel request to peer\n' | ||||
|             f'c)=> {chan.aid.reprol()}@[{chan.maddr}]\n' | ||||
|         ) | ||||
| 
 | ||||
|         # XXX: this is a soft wait on the channel (and its | ||||
|  | @ -179,7 +181,7 @@ async def maybe_wait_on_canced_subs( | |||
|                 log.warning( | ||||
|                     'Draining msg from disconnected peer\n' | ||||
|                     f'{chan_info}' | ||||
|                     f'{pformat(msg)}\n' | ||||
|                     f'{ppfmt(msg)}\n' | ||||
|                 ) | ||||
|                 # cid: str|None = msg.get('cid') | ||||
|                 cid: str|None = msg.cid | ||||
|  | @ -248,7 +250,7 @@ async def maybe_wait_on_canced_subs( | |||
|                 if children := local_nursery._children: | ||||
|                     # indent from above local-nurse repr | ||||
|                     report += ( | ||||
|                         f'   |_{pformat(children)}\n' | ||||
|                         f'   |_{ppfmt(children)}\n' | ||||
|                     ) | ||||
| 
 | ||||
|                 log.warning(report) | ||||
|  | @ -279,8 +281,9 @@ async def maybe_wait_on_canced_subs( | |||
|                     log.runtime( | ||||
|                         f'Peer IPC broke but subproc is alive?\n\n' | ||||
| 
 | ||||
|                         f'<=x {chan.aid}@{chan.raddr}\n' | ||||
|                         f'   |_{proc}\n' | ||||
|                         f'<=x {chan.aid.reprol()}@[{chan.maddr}]\n' | ||||
|                         f'\n' | ||||
|                         f'{proc}\n' | ||||
|                     ) | ||||
| 
 | ||||
|     return local_nursery | ||||
|  | @ -324,9 +327,10 @@ async def handle_stream_from_peer( | |||
| 
 | ||||
|     chan = Channel.from_stream(stream) | ||||
|     con_status: str = ( | ||||
|         'New inbound IPC connection <=\n' | ||||
|         f'|_{chan}\n' | ||||
|         f'New inbound IPC transport connection\n' | ||||
|         f'<=( {stream!r}\n' | ||||
|     ) | ||||
|     con_status_steps: str = '' | ||||
| 
 | ||||
|     # initial handshake with peer phase | ||||
|     try: | ||||
|  | @ -372,7 +376,7 @@ async def handle_stream_from_peer( | |||
|     if _pre_chan := server._peers.get(uid): | ||||
|         familiar: str = 'pre-existing-peer' | ||||
|     uid_short: str = f'{uid[0]}[{uid[1][-6:]}]' | ||||
|     con_status += ( | ||||
|     con_status_steps += ( | ||||
|         f' -> Handshake with {familiar} `{uid_short}` complete\n' | ||||
|     ) | ||||
| 
 | ||||
|  | @ -397,7 +401,7 @@ async def handle_stream_from_peer( | |||
|         None, | ||||
|     ) | ||||
|     if event: | ||||
|         con_status += ( | ||||
|         con_status_steps += ( | ||||
|             ' -> Waking subactor spawn waiters: ' | ||||
|             f'{event.statistics().tasks_waiting}\n' | ||||
|             f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n' | ||||
|  | @ -408,7 +412,7 @@ async def handle_stream_from_peer( | |||
|         event.set() | ||||
| 
 | ||||
|     else: | ||||
|         con_status += ( | ||||
|         con_status_steps += ( | ||||
|             f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n' | ||||
|         )  # type: ignore | ||||
| 
 | ||||
|  | @ -422,8 +426,15 @@ async def handle_stream_from_peer( | |||
|     # TODO: can we just use list-ref directly? | ||||
|     chans.append(chan) | ||||
| 
 | ||||
|     con_status += ' -> Entering RPC msg loop..\n' | ||||
|     log.runtime(con_status) | ||||
|     con_status_steps += ' -> Entering RPC msg loop..\n' | ||||
|     log.runtime( | ||||
|         con_status | ||||
|         + | ||||
|         textwrap.indent( | ||||
|             con_status_steps, | ||||
|             prefix=' '*3,  # align to first-ln | ||||
|         ) | ||||
|     ) | ||||
| 
 | ||||
|     # Begin channel management - respond to remote requests and | ||||
|     # process received reponses. | ||||
|  | @ -456,41 +467,67 @@ async def handle_stream_from_peer( | |||
|             disconnected=disconnected, | ||||
|         ) | ||||
| 
 | ||||
|         # ``Channel`` teardown and closure sequence | ||||
|         # `Channel` teardown and closure sequence | ||||
|         # drop ref to channel so it can be gc-ed and disconnected | ||||
|         con_teardown_status: str = ( | ||||
|             f'IPC channel disconnected:\n' | ||||
|             f'<=x uid: {chan.aid}\n' | ||||
|             f'   |_{pformat(chan)}\n\n' | ||||
|         # | ||||
|         # -[x]TODO mk this be like | ||||
|         # <=x Channel( | ||||
|         #     |_field: blah | ||||
|         # )> | ||||
|         op_repr: str = '<=x ' | ||||
|         chan_repr: str = nest_from_op( | ||||
|             input_op=op_repr, | ||||
|             op_suffix='', | ||||
|             nest_prefix='', | ||||
|             text=chan.pformat(), | ||||
|             nest_indent=len(op_repr)-1, | ||||
|             rm_from_first_ln='<', | ||||
|         ) | ||||
| 
 | ||||
|         con_teardown_status: str = ( | ||||
|             f'IPC channel disconnect\n' | ||||
|             f'\n' | ||||
|             f'{chan_repr}\n' | ||||
|             f'\n' | ||||
|         ) | ||||
| 
 | ||||
|         chans.remove(chan) | ||||
| 
 | ||||
|         # TODO: do we need to be this pedantic? | ||||
|         if not chans: | ||||
|             con_teardown_status += ( | ||||
|                 f'-> No more channels with {chan.aid}' | ||||
|                 f'-> No more channels with {chan.aid.reprol()!r}\n' | ||||
|             ) | ||||
|             server._peers.pop(uid, None) | ||||
| 
 | ||||
|         peers_str: str = '' | ||||
|         for uid, chans in server._peers.items(): | ||||
|             peers_str += ( | ||||
|                 f'uid: {uid}\n' | ||||
|             ) | ||||
|             for i, chan in enumerate(chans): | ||||
|                 peers_str += ( | ||||
|                     f' |_[{i}] {pformat(chan)}\n' | ||||
|         if peers := list(server._peers.values()): | ||||
|             peer_cnt: int = len(peers) | ||||
|             if ( | ||||
|                 (first := peers[0][0]) is not chan | ||||
|                 and | ||||
|                 not disconnected | ||||
|                 and | ||||
|                 peer_cnt > 1 | ||||
|             ): | ||||
|                 con_teardown_status += ( | ||||
|                     f'-> Remaining IPC {peer_cnt-1!r} peers:\n' | ||||
|                 ) | ||||
| 
 | ||||
|         con_teardown_status += ( | ||||
|             f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n' | ||||
|         ) | ||||
|                 for chans in server._peers.values(): | ||||
|                     first: Channel = chans[0] | ||||
|                     if not ( | ||||
|                         first is chan | ||||
|                         and | ||||
|                         disconnected | ||||
|                     ): | ||||
|                         con_teardown_status += ( | ||||
|                             f'  |_{first.aid.reprol()!r} -> {len(chans)!r} chans\n' | ||||
|                         ) | ||||
| 
 | ||||
|         # No more channels to other actors (at all) registered | ||||
|         # as connected. | ||||
|         if not server._peers: | ||||
|             con_teardown_status += ( | ||||
|                 'Signalling no more peer channel connections' | ||||
|                 '-> Signalling no more peer connections!\n' | ||||
|             ) | ||||
|             server._no_more_peers.set() | ||||
| 
 | ||||
|  | @ -579,10 +616,10 @@ async def handle_stream_from_peer( | |||
| 
 | ||||
| class Endpoint(Struct): | ||||
|     ''' | ||||
|     An instance of an IPC "bound" address where the lifetime of the | ||||
|     "ability to accept connections" (from clients) and then handle | ||||
|     those inbound sessions or sequences-of-packets is determined by | ||||
|     a (maybe pair of) nurser(y/ies). | ||||
|     An instance of an IPC "bound" address where the lifetime of an | ||||
|     "ability to accept connections" and handle the subsequent | ||||
|     sequence-of-packets (maybe oriented as sessions) is determined by | ||||
|     the underlying nursery scope(s). | ||||
| 
 | ||||
|     ''' | ||||
|     addr: Address | ||||
|  | @ -600,6 +637,24 @@ class Endpoint(Struct): | |||
|         MsgTransport,  # handle to encoded-msg transport stream | ||||
|     ] = {} | ||||
| 
 | ||||
|     def pformat( | ||||
|         self, | ||||
|         indent: int = 0, | ||||
|         privates: bool = False, | ||||
|     ) -> str: | ||||
|         type_repr: str = type(self).__name__ | ||||
|         fmtstr: str = ( | ||||
|             # !TODO, always be ns aware! | ||||
|             # f'|_netns: {netns}\n' | ||||
|             f' |.addr: {self.addr!r}\n' | ||||
|             f' |_peers: {len(self.peer_tpts)}\n' | ||||
|         ) | ||||
|         return ( | ||||
|             f'<{type_repr}(\n' | ||||
|             f'{fmtstr}' | ||||
|             f')>' | ||||
|         ) | ||||
| 
 | ||||
|     async def start_listener(self) -> SocketListener: | ||||
|         tpt_mod: ModuleType = inspect.getmodule(self.addr) | ||||
|         lstnr: SocketListener = await tpt_mod.start_listener( | ||||
|  | @ -639,11 +694,13 @@ class Endpoint(Struct): | |||
| class Server(Struct): | ||||
|     _parent_tn: Nursery | ||||
|     _stream_handler_tn: Nursery | ||||
| 
 | ||||
|     # level-triggered sig for whether "no peers are currently | ||||
|     # connected"; field is **always** set to an instance but | ||||
|     # initialized with `.is_set() == True`. | ||||
|     _no_more_peers: trio.Event | ||||
| 
 | ||||
|     # active eps as allocated by `.listen_on()` | ||||
|     _endpoints: list[Endpoint] = [] | ||||
| 
 | ||||
|     # connection tracking & mgmt | ||||
|  | @ -651,12 +708,19 @@ class Server(Struct): | |||
|         str,  # uaid | ||||
|         list[Channel],  # IPC conns from peer | ||||
|     ] = defaultdict(list) | ||||
| 
 | ||||
|     # events-table with entries registered unset while the local | ||||
|     # actor is waiting on a new actor to inbound connect, often | ||||
|     # a parent waiting on its child just after spawn. | ||||
|     _peer_connected: dict[ | ||||
|         tuple[str, str], | ||||
|         trio.Event, | ||||
|     ] = {} | ||||
| 
 | ||||
|     # syncs for setup/teardown sequences | ||||
|     # - null when not yet booted, | ||||
|     # - unset when active, | ||||
|     # - set when fully shutdown with 0 eps active. | ||||
|     _shutdown: trio.Event|None = None | ||||
| 
 | ||||
|     # TODO, maybe just make `._endpoints: list[Endpoint]` and | ||||
|  | @ -664,7 +728,6 @@ class Server(Struct): | |||
|     # @property | ||||
|     # def addrs2eps(self) -> dict[Address, Endpoint]: | ||||
|     #     ... | ||||
| 
 | ||||
|     @property | ||||
|     def proto_keys(self) -> list[str]: | ||||
|         return [ | ||||
|  | @ -690,7 +753,7 @@ class Server(Struct): | |||
|             # TODO: obvi a different server type when we eventually | ||||
|             # support some others XD | ||||
|             log.runtime( | ||||
|                 f'Cancelling server(s) for\n' | ||||
|                 f'Cancelling server(s) for tpt-protos\n' | ||||
|                 f'{self.proto_keys!r}\n' | ||||
|             ) | ||||
|             self._parent_tn.cancel_scope.cancel() | ||||
|  | @ -717,6 +780,14 @@ class Server(Struct): | |||
|                 f'protos: {tpt_protos!r}\n' | ||||
|             ) | ||||
| 
 | ||||
|     def len_peers( | ||||
|         self, | ||||
|     ) -> int: | ||||
|         return len([ | ||||
|             chan.connected() | ||||
|             for chan in chain(*self._peers.values()) | ||||
|         ]) | ||||
| 
 | ||||
|     def has_peers( | ||||
|         self, | ||||
|         check_chans: bool = False, | ||||
|  | @ -730,13 +801,11 @@ class Server(Struct): | |||
|             has_peers | ||||
|             and | ||||
|             check_chans | ||||
|             and | ||||
|             (peer_cnt := self.len_peers()) | ||||
|         ): | ||||
|             has_peers: bool = ( | ||||
|                 any(chan.connected() | ||||
|                     for chan in chain( | ||||
|                         *self._peers.values() | ||||
|                     ) | ||||
|                 ) | ||||
|                 peer_cnt > 0 | ||||
|                 and | ||||
|                 has_peers | ||||
|             ) | ||||
|  | @ -803,30 +872,66 @@ class Server(Struct): | |||
| 
 | ||||
|         return ev.is_set() | ||||
| 
 | ||||
|     def pformat(self) -> str: | ||||
|     @property | ||||
|     def repr_state(self) -> str: | ||||
|         ''' | ||||
|         A `str`-status describing the current state of this | ||||
|         IPC server in terms of the current operating "phase". | ||||
| 
 | ||||
|         ''' | ||||
|         status = 'server is active' | ||||
|         if self.has_peers(): | ||||
|             peer_cnt: int = self.len_peers() | ||||
|             status: str = ( | ||||
|                 f'{peer_cnt!r} peer chans' | ||||
|             ) | ||||
|         else: | ||||
|             status: str = 'No peer chans' | ||||
| 
 | ||||
|         if self.is_shutdown(): | ||||
|             status: str = 'server-shutdown' | ||||
| 
 | ||||
|         return status | ||||
| 
 | ||||
|     def pformat( | ||||
|         self, | ||||
|         privates: bool = False, | ||||
|     ) -> str: | ||||
|         eps: list[Endpoint] = self._endpoints | ||||
| 
 | ||||
|         state_repr: str = ( | ||||
|             f'{len(eps)!r} IPC-endpoints active' | ||||
|         ) | ||||
|         # state_repr: str = ( | ||||
|         #     f'{len(eps)!r} endpoints active' | ||||
|         # ) | ||||
|         fmtstr = ( | ||||
|             f' |_state: {state_repr}\n' | ||||
|             f'   no_more_peers: {self.has_peers()}\n' | ||||
|             f' |_state: {self.repr_state!r}\n' | ||||
|         ) | ||||
|         if self._shutdown is not None: | ||||
|             shutdown_stats: EventStatistics = self._shutdown.statistics() | ||||
|         if privates: | ||||
|             fmtstr += f'   no_more_peers: {self.has_peers()}\n' | ||||
| 
 | ||||
|             if self._shutdown is not None: | ||||
|                 shutdown_stats: EventStatistics = self._shutdown.statistics() | ||||
|                 fmtstr += ( | ||||
|                     f'   task_waiting_on_shutdown: {shutdown_stats}\n' | ||||
|                 ) | ||||
| 
 | ||||
|         if eps := self._endpoints: | ||||
|             addrs: list[tuple] = [ | ||||
|                 ep.addr for ep in eps | ||||
|             ] | ||||
|             repr_eps: str = ppfmt(addrs) | ||||
| 
 | ||||
|             fmtstr += ( | ||||
|                 f'   task_waiting_on_shutdown: {shutdown_stats}\n' | ||||
|                 f' |_endpoints: {repr_eps}\n' | ||||
|                 # ^TODO? how to indent closing ']'.. | ||||
|             ) | ||||
| 
 | ||||
|         fmtstr += ( | ||||
|             # TODO, use the `ppfmt()` helper from `modden`! | ||||
|             f' |_endpoints: {pformat(self._endpoints)}\n' | ||||
|             f' |_peers: {len(self._peers)} connected\n' | ||||
|         ) | ||||
|         if peers := self._peers: | ||||
|             fmtstr += ( | ||||
|                 f' |_peers: {len(peers)} connected\n' | ||||
|             ) | ||||
| 
 | ||||
|         return ( | ||||
|             f'<IPCServer(\n' | ||||
|             f'<Server(\n' | ||||
|             f'{fmtstr}' | ||||
|             f')>\n' | ||||
|         ) | ||||
|  | @ -885,8 +990,8 @@ class Server(Struct): | |||
|             ) | ||||
| 
 | ||||
|         log.runtime( | ||||
|             f'Binding to endpoints for,\n' | ||||
|             f'{accept_addrs}\n' | ||||
|             f'Binding endpoints\n' | ||||
|             f'{ppfmt(accept_addrs)}\n' | ||||
|         ) | ||||
|         eps: list[Endpoint] = await self._parent_tn.start( | ||||
|             partial( | ||||
|  | @ -896,13 +1001,19 @@ class Server(Struct): | |||
|                 listen_addrs=accept_addrs, | ||||
|             ) | ||||
|         ) | ||||
|         self._endpoints.extend(eps) | ||||
| 
 | ||||
|         serv_repr: str = nest_from_op( | ||||
|             input_op='(>', | ||||
|             text=self.pformat(), | ||||
|             nest_indent=1, | ||||
|         ) | ||||
|         log.runtime( | ||||
|             f'Started IPC endpoints\n' | ||||
|             f'{eps}\n' | ||||
|             f'Started IPC server\n' | ||||
|             f'{serv_repr}' | ||||
|         ) | ||||
| 
 | ||||
|         self._endpoints.extend(eps) | ||||
|         # XXX, just a little bit of sanity | ||||
|         # XXX, a little sanity on new ep allocations | ||||
|         group_tn: Nursery|None = None | ||||
|         ep: Endpoint | ||||
|         for ep in eps: | ||||
|  | @ -956,9 +1067,13 @@ async def _serve_ipc_eps( | |||
|                     stream_handler_tn=stream_handler_tn, | ||||
|                 ) | ||||
|                 try: | ||||
|                     ep_sclang: str = nest_from_op( | ||||
|                         input_op='>[', | ||||
|                         text=f'{ep.pformat()}', | ||||
|                     ) | ||||
|                     log.runtime( | ||||
|                         f'Starting new endpoint listener\n' | ||||
|                         f'{ep}\n' | ||||
|                         f'{ep_sclang}\n' | ||||
|                     ) | ||||
|                     listener: trio.abc.Listener = await ep.start_listener() | ||||
|                     assert listener is ep._listener | ||||
|  | @ -996,17 +1111,6 @@ async def _serve_ipc_eps( | |||
|                     handler_nursery=stream_handler_tn | ||||
|                 ) | ||||
|             ) | ||||
|             # TODO, wow make this message better! XD | ||||
|             log.runtime( | ||||
|                 'Started server(s)\n' | ||||
|                 + | ||||
|                 '\n'.join([f'|_{addr}' for addr in listen_addrs]) | ||||
|             ) | ||||
| 
 | ||||
|             log.runtime( | ||||
|                 f'Started IPC endpoints\n' | ||||
|                 f'{eps}\n' | ||||
|             ) | ||||
|             task_status.started( | ||||
|                 eps, | ||||
|             ) | ||||
|  | @ -1049,8 +1153,7 @@ async def open_ipc_server( | |||
|         try: | ||||
|             yield ipc_server | ||||
|             log.runtime( | ||||
|                 f'Waiting on server to shutdown or be cancelled..\n' | ||||
|                 f'{ipc_server}' | ||||
|                 'Server-tn running until terminated\n' | ||||
|             ) | ||||
|             # TODO? when if ever would we want/need this? | ||||
|             # with trio.CancelScope(shield=True): | ||||
|  |  | |||
|  | @ -160,10 +160,9 @@ async def start_listener( | |||
|     Start a TCP socket listener on the given `TCPAddress`. | ||||
| 
 | ||||
|     ''' | ||||
|     log.info( | ||||
|         f'Attempting to bind TCP socket\n' | ||||
|         f'>[\n' | ||||
|         f'|_{addr}\n' | ||||
|     log.runtime( | ||||
|         f'Trying socket bind\n' | ||||
|         f'>[ {addr}\n' | ||||
|     ) | ||||
|     # ?TODO, maybe we should just change the lower-level call this is | ||||
|     # using internall per-listener? | ||||
|  | @ -178,11 +177,10 @@ async def start_listener( | |||
|     assert len(listeners) == 1 | ||||
|     listener = listeners[0] | ||||
|     host, port = listener.socket.getsockname()[:2] | ||||
| 
 | ||||
|     bound_addr: TCPAddress = type(addr).from_addr((host, port)) | ||||
|     log.info( | ||||
|         f'Listening on TCP socket\n' | ||||
|         f'[>\n' | ||||
|         f' |_{addr}\n' | ||||
|         f'[> {bound_addr}\n' | ||||
|     ) | ||||
|     return listener | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue