Even more `.ipc.*` repr refinements

Mostly adjusting indentation, noise level, and clarity via `.pformat()`
tweaks more general use of `.devx.pformat.nest_from_op()`.

Specific impl deats,
- use `pformat.ppfmt()/`nest_from_op()` more seriously throughout
  `._server`.
- add a `._server.Endpoint.pformat()`.
- add `._server.Server.len_peers()` and `.repr_state()`.
- polish `Server.pformat()`.
- drop some redundant `log.runtime()`s from `._serve_ipc_eps()` instead
  leaving-them-only/putting-them in the caller pub meth.
- `._tcp.start_listener()` log the bound addr, not the input (which may
  be the 0-port.
enable_tpts
Tyler Goodlet 2025-07-03 23:33:02 -04:00
parent 3201437f4e
commit 065104401c
3 changed files with 191 additions and 90 deletions

View File

@ -462,8 +462,8 @@ class Channel:
await self.send(aid)
peer_aid: Aid = await self.recv()
log.runtime(
f'Received hanshake with peer '
f'{peer_aid.reprol(sin_uuid=False)}\n'
f'Received hanshake with peer\n'
f'<= {peer_aid.reprol(sin_uuid=False)}\n'
)
# NOTE, we always are referencing the remote peer!
self.aid = peer_aid

View File

@ -26,7 +26,7 @@ from contextlib import (
from functools import partial
from itertools import chain
import inspect
from pprint import pformat
import textwrap
from types import (
ModuleType,
)
@ -43,7 +43,10 @@ from trio import (
SocketListener,
)
# from ..devx import debug
from ..devx.pformat import (
ppfmt,
nest_from_op,
)
from .._exceptions import (
TransportClosed,
)
@ -141,9 +144,8 @@ async def maybe_wait_on_canced_subs(
):
log.cancel(
'Waiting on cancel request to peer..\n'
f'c)=>\n'
f' |_{chan.aid}\n'
'Waiting on cancel request to peer\n'
f'c)=> {chan.aid.reprol()}@[{chan.maddr}]\n'
)
# XXX: this is a soft wait on the channel (and its
@ -179,7 +181,7 @@ async def maybe_wait_on_canced_subs(
log.warning(
'Draining msg from disconnected peer\n'
f'{chan_info}'
f'{pformat(msg)}\n'
f'{ppfmt(msg)}\n'
)
# cid: str|None = msg.get('cid')
cid: str|None = msg.cid
@ -248,7 +250,7 @@ async def maybe_wait_on_canced_subs(
if children := local_nursery._children:
# indent from above local-nurse repr
report += (
f' |_{pformat(children)}\n'
f' |_{ppfmt(children)}\n'
)
log.warning(report)
@ -279,8 +281,9 @@ async def maybe_wait_on_canced_subs(
log.runtime(
f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.aid}@{chan.raddr}\n'
f' |_{proc}\n'
f'<=x {chan.aid.reprol()}@[{chan.maddr}]\n'
f'\n'
f'{proc}\n'
)
return local_nursery
@ -324,9 +327,10 @@ async def handle_stream_from_peer(
chan = Channel.from_stream(stream)
con_status: str = (
'New inbound IPC connection <=\n'
f'|_{chan}\n'
f'New inbound IPC transport connection\n'
f'<=( {stream!r}\n'
)
con_status_steps: str = ''
# initial handshake with peer phase
try:
@ -372,7 +376,7 @@ async def handle_stream_from_peer(
if _pre_chan := server._peers.get(uid):
familiar: str = 'pre-existing-peer'
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
con_status += (
con_status_steps += (
f' -> Handshake with {familiar} `{uid_short}` complete\n'
)
@ -397,7 +401,7 @@ async def handle_stream_from_peer(
None,
)
if event:
con_status += (
con_status_steps += (
' -> Waking subactor spawn waiters: '
f'{event.statistics().tasks_waiting}\n'
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
@ -408,7 +412,7 @@ async def handle_stream_from_peer(
event.set()
else:
con_status += (
con_status_steps += (
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
) # type: ignore
@ -422,8 +426,15 @@ async def handle_stream_from_peer(
# TODO: can we just use list-ref directly?
chans.append(chan)
con_status += ' -> Entering RPC msg loop..\n'
log.runtime(con_status)
con_status_steps += ' -> Entering RPC msg loop..\n'
log.runtime(
con_status
+
textwrap.indent(
con_status_steps,
prefix=' '*3, # align to first-ln
)
)
# Begin channel management - respond to remote requests and
# process received reponses.
@ -456,41 +467,67 @@ async def handle_stream_from_peer(
disconnected=disconnected,
)
# ``Channel`` teardown and closure sequence
# `Channel` teardown and closure sequence
# drop ref to channel so it can be gc-ed and disconnected
con_teardown_status: str = (
f'IPC channel disconnected:\n'
f'<=x uid: {chan.aid}\n'
f' |_{pformat(chan)}\n\n'
#
# -[x]TODO mk this be like
# <=x Channel(
# |_field: blah
# )>
op_repr: str = '<=x '
chan_repr: str = nest_from_op(
input_op=op_repr,
op_suffix='',
nest_prefix='',
text=chan.pformat(),
nest_indent=len(op_repr)-1,
rm_from_first_ln='<',
)
con_teardown_status: str = (
f'IPC channel disconnect\n'
f'\n'
f'{chan_repr}\n'
f'\n'
)
chans.remove(chan)
# TODO: do we need to be this pedantic?
if not chans:
con_teardown_status += (
f'-> No more channels with {chan.aid}'
f'-> No more channels with {chan.aid.reprol()!r}\n'
)
server._peers.pop(uid, None)
peers_str: str = ''
for uid, chans in server._peers.items():
peers_str += (
f'uid: {uid}\n'
)
for i, chan in enumerate(chans):
peers_str += (
f' |_[{i}] {pformat(chan)}\n'
)
if peers := list(server._peers.values()):
peer_cnt: int = len(peers)
if (
(first := peers[0][0]) is not chan
and
not disconnected
and
peer_cnt > 1
):
con_teardown_status += (
f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n'
f'-> Remaining IPC {peer_cnt-1!r} peers:\n'
)
for chans in server._peers.values():
first: Channel = chans[0]
if not (
first is chan
and
disconnected
):
con_teardown_status += (
f' |_{first.aid.reprol()!r} -> {len(chans)!r} chans\n'
)
# No more channels to other actors (at all) registered
# as connected.
if not server._peers:
con_teardown_status += (
'Signalling no more peer channel connections'
'-> Signalling no more peer connections!\n'
)
server._no_more_peers.set()
@ -579,10 +616,10 @@ async def handle_stream_from_peer(
class Endpoint(Struct):
'''
An instance of an IPC "bound" address where the lifetime of the
"ability to accept connections" (from clients) and then handle
those inbound sessions or sequences-of-packets is determined by
a (maybe pair of) nurser(y/ies).
An instance of an IPC "bound" address where the lifetime of an
"ability to accept connections" and handle the subsequent
sequence-of-packets (maybe oriented as sessions) is determined by
the underlying nursery scope(s).
'''
addr: Address
@ -600,6 +637,24 @@ class Endpoint(Struct):
MsgTransport, # handle to encoded-msg transport stream
] = {}
def pformat(
self,
indent: int = 0,
privates: bool = False,
) -> str:
type_repr: str = type(self).__name__
fmtstr: str = (
# !TODO, always be ns aware!
# f'|_netns: {netns}\n'
f' |.addr: {self.addr!r}\n'
f' |_peers: {len(self.peer_tpts)}\n'
)
return (
f'<{type_repr}(\n'
f'{fmtstr}'
f')>'
)
async def start_listener(self) -> SocketListener:
tpt_mod: ModuleType = inspect.getmodule(self.addr)
lstnr: SocketListener = await tpt_mod.start_listener(
@ -639,11 +694,13 @@ class Endpoint(Struct):
class Server(Struct):
_parent_tn: Nursery
_stream_handler_tn: Nursery
# level-triggered sig for whether "no peers are currently
# connected"; field is **always** set to an instance but
# initialized with `.is_set() == True`.
_no_more_peers: trio.Event
# active eps as allocated by `.listen_on()`
_endpoints: list[Endpoint] = []
# connection tracking & mgmt
@ -651,12 +708,19 @@ class Server(Struct):
str, # uaid
list[Channel], # IPC conns from peer
] = defaultdict(list)
# events-table with entries registered unset while the local
# actor is waiting on a new actor to inbound connect, often
# a parent waiting on its child just after spawn.
_peer_connected: dict[
tuple[str, str],
trio.Event,
] = {}
# syncs for setup/teardown sequences
# - null when not yet booted,
# - unset when active,
# - set when fully shutdown with 0 eps active.
_shutdown: trio.Event|None = None
# TODO, maybe just make `._endpoints: list[Endpoint]` and
@ -664,7 +728,6 @@ class Server(Struct):
# @property
# def addrs2eps(self) -> dict[Address, Endpoint]:
# ...
@property
def proto_keys(self) -> list[str]:
return [
@ -690,7 +753,7 @@ class Server(Struct):
# TODO: obvi a different server type when we eventually
# support some others XD
log.runtime(
f'Cancelling server(s) for\n'
f'Cancelling server(s) for tpt-protos\n'
f'{self.proto_keys!r}\n'
)
self._parent_tn.cancel_scope.cancel()
@ -717,6 +780,14 @@ class Server(Struct):
f'protos: {tpt_protos!r}\n'
)
def len_peers(
self,
) -> int:
return len([
chan.connected()
for chan in chain(*self._peers.values())
])
def has_peers(
self,
check_chans: bool = False,
@ -730,13 +801,11 @@ class Server(Struct):
has_peers
and
check_chans
and
(peer_cnt := self.len_peers())
):
has_peers: bool = (
any(chan.connected()
for chan in chain(
*self._peers.values()
)
)
peer_cnt > 0
and
has_peers
)
@ -803,30 +872,66 @@ class Server(Struct):
return ev.is_set()
def pformat(self) -> str:
@property
def repr_state(self) -> str:
'''
A `str`-status describing the current state of this
IPC server in terms of the current operating "phase".
'''
status = 'server is active'
if self.has_peers():
peer_cnt: int = self.len_peers()
status: str = (
f'{peer_cnt!r} peer chans'
)
else:
status: str = 'No peer chans'
if self.is_shutdown():
status: str = 'server-shutdown'
return status
def pformat(
self,
privates: bool = False,
) -> str:
eps: list[Endpoint] = self._endpoints
state_repr: str = (
f'{len(eps)!r} IPC-endpoints active'
)
# state_repr: str = (
# f'{len(eps)!r} endpoints active'
# )
fmtstr = (
f' |_state: {state_repr}\n'
f' no_more_peers: {self.has_peers()}\n'
f' |_state: {self.repr_state!r}\n'
)
if privates:
fmtstr += f' no_more_peers: {self.has_peers()}\n'
if self._shutdown is not None:
shutdown_stats: EventStatistics = self._shutdown.statistics()
fmtstr += (
f' task_waiting_on_shutdown: {shutdown_stats}\n'
)
if eps := self._endpoints:
addrs: list[tuple] = [
ep.addr for ep in eps
]
repr_eps: str = ppfmt(addrs)
fmtstr += (
# TODO, use the `ppfmt()` helper from `modden`!
f' |_endpoints: {pformat(self._endpoints)}\n'
f' |_peers: {len(self._peers)} connected\n'
f' |_endpoints: {repr_eps}\n'
# ^TODO? how to indent closing ']'..
)
if peers := self._peers:
fmtstr += (
f' |_peers: {len(peers)} connected\n'
)
return (
f'<IPCServer(\n'
f'<Server(\n'
f'{fmtstr}'
f')>\n'
)
@ -885,8 +990,8 @@ class Server(Struct):
)
log.runtime(
f'Binding to endpoints for,\n'
f'{accept_addrs}\n'
f'Binding endpoints\n'
f'{ppfmt(accept_addrs)}\n'
)
eps: list[Endpoint] = await self._parent_tn.start(
partial(
@ -896,13 +1001,19 @@ class Server(Struct):
listen_addrs=accept_addrs,
)
)
self._endpoints.extend(eps)
serv_repr: str = nest_from_op(
input_op='(>',
text=self.pformat(),
nest_indent=1,
)
log.runtime(
f'Started IPC endpoints\n'
f'{eps}\n'
f'Started IPC server\n'
f'{serv_repr}'
)
self._endpoints.extend(eps)
# XXX, just a little bit of sanity
# XXX, a little sanity on new ep allocations
group_tn: Nursery|None = None
ep: Endpoint
for ep in eps:
@ -956,9 +1067,13 @@ async def _serve_ipc_eps(
stream_handler_tn=stream_handler_tn,
)
try:
ep_sclang: str = nest_from_op(
input_op='>[',
text=f'{ep.pformat()}',
)
log.runtime(
f'Starting new endpoint listener\n'
f'{ep}\n'
f'{ep_sclang}\n'
)
listener: trio.abc.Listener = await ep.start_listener()
assert listener is ep._listener
@ -996,17 +1111,6 @@ async def _serve_ipc_eps(
handler_nursery=stream_handler_tn
)
)
# TODO, wow make this message better! XD
log.runtime(
'Started server(s)\n'
+
'\n'.join([f'|_{addr}' for addr in listen_addrs])
)
log.runtime(
f'Started IPC endpoints\n'
f'{eps}\n'
)
task_status.started(
eps,
)
@ -1049,8 +1153,7 @@ async def open_ipc_server(
try:
yield ipc_server
log.runtime(
f'Waiting on server to shutdown or be cancelled..\n'
f'{ipc_server}'
'Server-tn running until terminated\n'
)
# TODO? when if ever would we want/need this?
# with trio.CancelScope(shield=True):

View File

@ -127,10 +127,9 @@ async def start_listener(
Start a TCP socket listener on the given `TCPAddress`.
'''
log.info(
f'Attempting to bind TCP socket\n'
f'>[\n'
f'|_{addr}\n'
log.runtime(
f'Trying socket bind\n'
f'>[ {addr}\n'
)
# ?TODO, maybe we should just change the lower-level call this is
# using internall per-listener?
@ -145,11 +144,10 @@ async def start_listener(
assert len(listeners) == 1
listener = listeners[0]
host, port = listener.socket.getsockname()[:2]
bound_addr: TCPAddress = type(addr).from_addr((host, port))
log.info(
f'Listening on TCP socket\n'
f'[>\n'
f' |_{addr}\n'
f'[> {bound_addr}\n'
)
return listener