Compare commits

..

No commits in common. "49c61e40c7abb90d0f619ab0b246b934071dea3d" and "b71afdc6151831fe1d7c6258953c28dcb449e4ee" have entirely different histories.

7 changed files with 145 additions and 393 deletions

View File

@ -49,7 +49,7 @@ def test_basic_ipc_server(
) )
assert server._no_more_peers.is_set() assert server._no_more_peers.is_set()
eps: list[ipc._server.Endpoint] = await server.listen_on( eps: list[ipc.IPCEndpoint] = await server.listen_on(
accept_addrs=[rando_addr], accept_addrs=[rando_addr],
stream_handler_nursery=None, stream_handler_nursery=None,
) )

View File

@ -234,7 +234,7 @@ class Actor:
# state # state
self._cancel_complete = trio.Event() self._cancel_complete = trio.Event()
self._cancel_called_by: tuple[str, tuple]|None = None self._cancel_called_by_remote: tuple[str, tuple]|None = None
self._cancel_called: bool = False self._cancel_called: bool = False
# retreive and store parent `__main__` data which # retreive and store parent `__main__` data which
@ -346,118 +346,69 @@ class Actor:
def pid(self) -> int: def pid(self) -> int:
return self._aid.pid return self._aid.pid
@property
def repr_state(self) -> str:
if self.cancel_complete:
return 'cancelled'
elif canceller := self.cancel_caller:
return f' and cancel-called by {canceller}'
else:
return 'running'
def pformat( def pformat(
self, self,
ds: str = ': ', ds: str = ': ',
indent: int = 0, indent: int = 0,
privates: bool = False,
) -> str: ) -> str:
fields_sect_prefix: str = ' |_'
fmtstr: str = f'|_id: {self.aid.reprol()!r}\n' parent_uid: tuple|None = None
if privates:
aid_nest_prefix: str = '|_aid='
aid_field_repr: str = _pformat.nest_from_op(
input_op='',
text=pretty_struct.pformat(
struct=self.aid,
field_indent=2,
),
op_suffix='',
nest_prefix=aid_nest_prefix,
nest_indent=0,
)
fmtstr: str = f'{aid_field_repr}'
if rent_chan := self._parent_chan: if rent_chan := self._parent_chan:
fmtstr += ( parent_uid = rent_chan.uid
f"|_parent{ds}{rent_chan.aid.reprol()}\n"
)
peers: list = []
server: _server.IPCServer = self.ipc_server server: _server.IPCServer = self.ipc_server
ipc_server_sect: str = ''
if server: if server:
if privates: peers: list[tuple] = list(server._peer_connected)
server_repr: str = self._ipc_server.pformat(
privates=privates,
)
# create field ln as a key-header indented under
# and up to the section's key prefix.
# ^XXX if we were to indent `repr(Server)` to
# '<key>: '
# _here_^
server_repr: str = _pformat.nest_from_op(
input_op='', # nest as sub-obj
op_suffix='',
text=server_repr,
)
fmtstr += (
f"{server_repr}"
)
else:
fmtstr += (
f'|_ipc: {server.repr_state!r}\n'
)
fmtstr += ( # create field ln as a key-header indented under
f'|_rpc: {len(self._rpc_tasks)} active tasks\n' # and up to the section's key prefix.
) # field_ln_header: str = textwrap.indent(
# text=f"ipc_server{ds}",
# TODO, actually fix the .repr_state impl/output? # prefix=' '*len(fields_sect_prefix),
# append ipc-ctx state summary # )
# ctxs: dict = self._contexts # ^XXX if we were to indent `repr(Server)` to
# if ctxs: # '<key>: '
# ctx_states: dict[str, int] = {} # _here_^
# for ctx in self._contexts.values(): server_repr: str = textwrap.indent(
# ctx_state: str = ctx.repr_state text=self._ipc_server.pformat(),
# cnt = ctx_states.setdefault(ctx_state, 0) # prefix=' '*len(field_ln_header),
# ctx_states[ctx_state] = cnt + 1 prefix=' '*len(fields_sect_prefix),
)
# fmtstr += ( ipc_server_sect: str = (
# f" ctxs{ds}{ctx_states}\n" # f'{field_ln_header}\n'
# ) f'{server_repr}'
# runtime-state
task_name: str = '<dne>'
if task := self._task:
task_name: str = task.name
fmtstr += (
# TODO, this just like ctx?
f'|_state: {self.repr_state!r}\n'
f' task: {task_name}\n'
f' loglevel: {self.loglevel!r}\n'
f' subactors_spawned: {len(self._actoruid2nursery)}\n'
)
if not _state.is_root_process():
fmtstr += f' spawn_method: {self._spawn_method!r}\n'
if privates:
fmtstr += (
# f' actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' cancel_complete{ds}{self._cancel_complete}\n'
f' cancel_called_by_remote{ds}{self._cancel_called_by}\n'
f' cancel_called{ds}{self._cancel_called}\n'
)
if fmtstr:
fmtstr: str = textwrap.indent(
text=fmtstr,
prefix=' '*(1 + indent),
) )
fmtstr: str = (
f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n"
f" parent{ds}{parent_uid}\n"
# f'\n'
f' |_ipc: {len(peers)!r} connected peers\n'
f" peers{ds}{peers!r}\n"
f"{ipc_server_sect}"
# f'\n'
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
f" ctxs{ds}{len(self._contexts)}\n"
# f'\n'
f' |_runtime: ._task{ds}{self._task!r}\n'
f' _spawn_method{ds}{self._spawn_method}\n'
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' _forkserver_info{ds}{self._forkserver_info}\n'
# f'\n'
f' |_state: "TODO: .repr_state()"\n'
f' _cancel_complete{ds}{self._cancel_complete}\n'
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
f' _cancel_called{ds}{self._cancel_called}\n'
)
_repr: str = ( _repr: str = (
f'<{type(self).__name__}(\n' '<Actor(\n'
f'{fmtstr}' +
f')>\n' fmtstr
+
')>\n'
) )
if indent: if indent:
_repr: str = textwrap.indent( _repr: str = textwrap.indent(
@ -582,11 +533,11 @@ class Actor:
queue. queue.
''' '''
aid: msgtypes.Aid = chan.aid uid: tuple[str, str] = chan.uid
assert aid, f"`chan.aid` can't be {aid}" assert uid, f"`chan.uid` can't be {uid}"
try: try:
ctx: Context = self._contexts[( ctx: Context = self._contexts[(
aid.uid, uid,
cid, cid,
# TODO: how to determine this tho? # TODO: how to determine this tho?
@ -597,7 +548,7 @@ class Actor:
'Ignoring invalid IPC msg!?\n' 'Ignoring invalid IPC msg!?\n'
f'Ctx seems to not/no-longer exist??\n' f'Ctx seems to not/no-longer exist??\n'
f'\n' f'\n'
f'<=? {aid.reprol()!r}\n' f'<=? {uid}\n'
f' |_{pretty_struct.pformat(msg)}\n' f' |_{pretty_struct.pformat(msg)}\n'
) )
match msg: match msg:
@ -646,7 +597,6 @@ class Actor:
msging session's lifetime. msging session's lifetime.
''' '''
# ?TODO, use Aid here as well?
actor_uid = chan.uid actor_uid = chan.uid
assert actor_uid assert actor_uid
try: try:
@ -995,22 +945,6 @@ class Actor:
None, # self cancel all rpc tasks None, # self cancel all rpc tasks
) )
@property
def cancel_complete(self) -> bool:
return self._cancel_complete.is_set()
@property
def cancel_called(self) -> bool:
'''
Was this actor requested to cancel by a remote peer actor.
'''
return self._cancel_called_by is not None
@property
def cancel_caller(self) -> msgtypes.Aid|None:
return self._cancel_called_by
async def cancel( async def cancel(
self, self,
@ -1035,18 +969,20 @@ class Actor:
''' '''
( (
requesting_aid, # Aid requesting_uid,
requester_type, # str requester_type,
req_chan, req_chan,
log_meth, log_meth,
) = ( ) = (
req_chan.aid, req_chan.uid,
'peer', 'peer',
req_chan, req_chan,
log.cancel, log.cancel,
) if req_chan else ( ) if req_chan else (
# a self cancel of ALL rpc tasks # a self cancel of ALL rpc tasks
self.aid, self.uid,
'self', 'self',
self, self,
log.runtime, log.runtime,
@ -1054,14 +990,14 @@ class Actor:
# TODO: just use the new `Context.repr_rpc: str` (and # TODO: just use the new `Context.repr_rpc: str` (and
# other) repr fields instead of doing this all manual.. # other) repr fields instead of doing this all manual..
msg: str = ( msg: str = (
f'Actor-runtime cancel request from {requester_type!r}\n' f'Actor-runtime cancel request from {requester_type}\n\n'
f'<=c) {requesting_uid}\n'
f' |_{self}\n'
f'\n' f'\n'
f'<=c)\n'
f'{self}'
) )
# TODO: what happens here when we self-cancel tho? # TODO: what happens here when we self-cancel tho?
self._cancel_called_by: tuple = requesting_aid self._cancel_called_by_remote: tuple = requesting_uid
self._cancel_called = True self._cancel_called = True
# cancel all ongoing rpc tasks # cancel all ongoing rpc tasks
@ -1089,7 +1025,7 @@ class Actor:
# self-cancel **all** ongoing RPC tasks # self-cancel **all** ongoing RPC tasks
await self.cancel_rpc_tasks( await self.cancel_rpc_tasks(
req_aid=requesting_aid, req_uid=requesting_uid,
parent_chan=None, parent_chan=None,
) )
@ -1118,7 +1054,8 @@ class Actor:
self, self,
cid: str, cid: str,
parent_chan: Channel, parent_chan: Channel,
requesting_aid: msgtypes.Aid|None, requesting_uid: tuple[str, str]|None,
# ^^TODO! use the `Aid` directly here!
ipc_msg: dict|None|bool = False, ipc_msg: dict|None|bool = False,
@ -1156,7 +1093,7 @@ class Actor:
log.runtime( log.runtime(
'Cancel request for invalid RPC task.\n' 'Cancel request for invalid RPC task.\n'
'The task likely already completed or was never started!\n\n' 'The task likely already completed or was never started!\n\n'
f'<= canceller: {requesting_aid}\n' f'<= canceller: {requesting_uid}\n'
f'=> {cid}@{parent_chan.uid}\n' f'=> {cid}@{parent_chan.uid}\n'
f' |_{parent_chan}\n' f' |_{parent_chan}\n'
) )
@ -1164,7 +1101,7 @@ class Actor:
log.cancel( log.cancel(
'Rxed cancel request for RPC task\n' 'Rxed cancel request for RPC task\n'
f'{ctx._task!r} <=c) {requesting_aid}\n' f'{ctx._task!r} <=c) {requesting_uid}\n'
f'|_>> {ctx.repr_rpc}\n' f'|_>> {ctx.repr_rpc}\n'
# f'|_{ctx._task}\n' # f'|_{ctx._task}\n'
@ -1190,9 +1127,9 @@ class Actor:
) )
if ( if (
ctx._canceller is None ctx._canceller is None
and requesting_aid and requesting_uid
): ):
ctx._canceller: tuple = requesting_aid.uid ctx._canceller: tuple = requesting_uid
# TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and # TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and
# then raise and pack it here? # then raise and pack it here?
@ -1218,7 +1155,7 @@ class Actor:
# wait for _invoke to mark the task complete # wait for _invoke to mark the task complete
flow_info: str = ( flow_info: str = (
f'<= canceller: {requesting_aid}\n' f'<= canceller: {requesting_uid}\n'
f'=> ipc-parent: {parent_chan}\n' f'=> ipc-parent: {parent_chan}\n'
f'|_{ctx}\n' f'|_{ctx}\n'
) )
@ -1235,7 +1172,7 @@ class Actor:
async def cancel_rpc_tasks( async def cancel_rpc_tasks(
self, self,
req_aid: msgtypes.Aid, req_uid: tuple[str, str],
# NOTE: when None is passed we cancel **all** rpc # NOTE: when None is passed we cancel **all** rpc
# tasks running in this actor! # tasks running in this actor!
@ -1252,7 +1189,7 @@ class Actor:
if not tasks: if not tasks:
log.runtime( log.runtime(
'Actor has no cancellable RPC tasks?\n' 'Actor has no cancellable RPC tasks?\n'
f'<= canceller: {req_aid.reprol()}\n' f'<= canceller: {req_uid}\n'
) )
return return
@ -1292,7 +1229,7 @@ class Actor:
) )
log.cancel( log.cancel(
f'Cancelling {descr} RPC tasks\n\n' f'Cancelling {descr} RPC tasks\n\n'
f'<=c) {req_aid} [canceller]\n' f'<=c) {req_uid} [canceller]\n'
f'{rent_chan_repr}' f'{rent_chan_repr}'
f'c)=> {self.uid} [cancellee]\n' f'c)=> {self.uid} [cancellee]\n'
f' |_{self} [with {len(tasks)} tasks]\n' f' |_{self} [with {len(tasks)} tasks]\n'
@ -1320,7 +1257,7 @@ class Actor:
await self._cancel_task( await self._cancel_task(
cid, cid,
task_caller_chan, task_caller_chan,
requesting_aid=req_aid, requesting_uid=req_uid,
) )
if tasks: if tasks:
@ -1617,9 +1554,8 @@ async def async_main(
# 'Blocking on service nursery to exit..\n' # 'Blocking on service nursery to exit..\n'
) )
log.runtime( log.runtime(
'Service nursery complete\n' "Service nursery complete\n"
'\n' "Waiting on root nursery to complete"
'-> Waiting on root nursery to complete'
) )
# Blocks here as expected until the root nursery is # Blocks here as expected until the root nursery is
@ -1769,7 +1705,7 @@ async def async_main(
) )
teardown_report += ( teardown_report += (
'Actor runtime exited\n' 'Actor runtime exited\n'
f'{op_nested_actor_repr}' f'{op_nested_actor_repr}\n'
) )
log.info(teardown_report) log.info(teardown_report)

View File

@ -15,10 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Pretty formatters for use throughout our internals. Pretty formatters for use throughout the code base.
Mostly handy for logging and exception message content.
Handy for logging and exception message content but also for `repr()`
in REPL(s).
''' '''
import sys import sys
@ -226,8 +224,8 @@ def pformat_cs(
field_prefix: str = ' |_', field_prefix: str = ' |_',
) -> str: ) -> str:
''' '''
Pretty format info about a `trio.CancelScope` including most of Pretty format info about a `trio.CancelScope` including most
its public state and `._cancel_status`. of its public state and `._cancel_status`.
The output can be modified to show a "var name" for the The output can be modified to show a "var name" for the
instance as a field prefix, just a simple str before each instance as a field prefix, just a simple str before each
@ -254,12 +252,9 @@ def pformat_cs(
def nest_from_op( def nest_from_op(
input_op: str, # TODO, Literal of all op-"symbols" from below? input_op: str, # TODO, Literal of all op-"symbols" from below?
text: str, text: str,
prefix_op: bool = True, # unset is to suffix the first line
# optionally suffix `text`, by def on a newline
op_suffix='\n',
nest_prefix: str = '|_', nest_prefix: str = '|_',
nest_indent: int|None = None, nest_indent: int = 0,
# XXX indent `next_prefix` "to-the-right-of" `input_op` # XXX indent `next_prefix` "to-the-right-of" `input_op`
# by this count of whitespaces (' '). # by this count of whitespaces (' ').
@ -353,15 +348,11 @@ def nest_from_op(
prefix=nest_indent*' ', prefix=nest_indent*' ',
) )
indented_tree_str: str = text tree_str_indent: int = len(nest_prefix)
tree_str_indent: int = 0 indented_tree_str: str = textwrap.indent(
if nest_indent != 0: text,
tree_str_indent: int = len(nest_prefix) prefix=' '*tree_str_indent
indented_tree_str: str = textwrap.indent( )
text,
prefix=' '*tree_str_indent
)
# inject any provided nesting-prefix chars # inject any provided nesting-prefix chars
# into the head of the first line. # into the head of the first line.
if nest_prefix: if nest_prefix:
@ -369,126 +360,7 @@ def nest_from_op(
f'{nest_prefix}{indented_tree_str[tree_str_indent:]}' f'{nest_prefix}{indented_tree_str[tree_str_indent:]}'
) )
if prefix_op:
return (
f'{input_op}{op_suffix}'
f'{indented_tree_str}'
)
else:
tree_lns: list[str] = indented_tree_str.splitlines()
first: str = tree_lns[0]
rest: str = '\n'.join(tree_lns[1:])
return (
f'{first}{input_op}{op_suffix}'
f'{rest}'
)
# ------ modden.repr ------
# XXX originally taken verbaatim from `modden.repr`
'''
More "multi-line" representation then the stdlib's `pprint` equivs.
'''
from inspect import (
FrameInfo,
stack,
)
import pprint
import reprlib
from typing import (
Callable,
)
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=3, # indent used for repr of recursive objects
maxlevel=616, # recursion levels
maxdict=616, # max items shown for `dict`
maxlist=616, # max items shown for `dict`
maxstring=616, # match editor line-len limit
maxtuple=616, # match editor line-len limit
maxother=616, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr
def ppfmt(
obj: object,
do_print: bool = False,
) -> str:
'''
The `pprint.pformat()` version of `pprint.pp()`, namely
a default `sort_dicts=False`.. (which i think should be
the normal default in the stdlib).
'''
pprepr: Callable = mk_repr()
repr_str: str = pprepr(obj)
if do_print:
return pprint.pp(repr_str)
return repr_str
pformat = ppfmt
def pfmt_frame_info(fi: FrameInfo) -> str:
'''
Like a std `inspect.FrameInfo.__repr__()` but multi-line..
'''
return ( return (
'FrameInfo(\n' f'{input_op}\n'
' frame={!r},\n' f'{indented_tree_str}'
' filename={!r},\n' )
' lineno={!r},\n'
' function={!r},\n'
' code_context={!r},\n'
' index={!r},\n'
' positions={!r})'
).format(
fi.frame,
fi.filename,
fi.lineno,
fi.function,
fi.code_context,
fi.index,
fi.positions
)
def pfmt_callstack(frames: int = 1) -> str:
'''
Generate a string of nested `inspect.FrameInfo` objects returned
from a `inspect.stack()` call such that only the `.frame` field
for each layer is pprinted.
'''
caller_frames: list[FrameInfo] = stack()[1:1+frames]
frames_str: str = ''
for i, frame_info in enumerate(caller_frames):
frames_str += textwrap.indent(
f'{frame_info.frame!r}\n',
prefix=' '*i,
)
return frames_str

View File

@ -196,12 +196,9 @@ class Channel:
self._transport.codec = orig self._transport.codec = orig
# TODO: do a .src/.dst: str for maddrs? # TODO: do a .src/.dst: str for maddrs?
def pformat( def pformat(self) -> str:
self,
privates: bool = False,
) -> str:
if not self._transport: if not self._transport:
return '<Channel( with inactive transport? )>' return '<Channel with inactive transport?>'
tpt: MsgTransport = self._transport tpt: MsgTransport = self._transport
tpt_name: str = type(tpt).__name__ tpt_name: str = type(tpt).__name__
@ -209,17 +206,14 @@ class Channel:
'connected' if self.connected() 'connected' if self.connected()
else 'closed' else 'closed'
) )
repr_str: str = ( return (
f'<Channel(\n' f'<Channel(\n'
f' |_status: {tpt_status!r}\n' f' |_status: {tpt_status!r}\n'
) + (
f' _closed={self._closed}\n' f' _closed={self._closed}\n'
f' _cancel_called={self._cancel_called}\n' f' _cancel_called={self._cancel_called}\n'
if privates else '' f'\n'
) + ( # peer-actor (processs) section f' |_peer: {self.aid}\n'
f' |_peer: {self.aid.reprol()!r}\n' f'\n'
if self.aid else '<unknown>'
) + (
f' |_msgstream: {tpt_name}\n' f' |_msgstream: {tpt_name}\n'
f' proto={tpt.laddr.proto_key!r}\n' f' proto={tpt.laddr.proto_key!r}\n'
f' layer={tpt.layer_key!r}\n' f' layer={tpt.layer_key!r}\n'
@ -229,13 +223,9 @@ class Channel:
f' stream={tpt.stream}\n' f' stream={tpt.stream}\n'
f' maddr={tpt.maddr!r}\n' f' maddr={tpt.maddr!r}\n'
f' drained={tpt.drained}\n' f' drained={tpt.drained}\n'
) + (
f' _send_lock={tpt._send_lock.statistics()}\n' f' _send_lock={tpt._send_lock.statistics()}\n'
if privates else '' f')>\n'
) + (
')>\n'
) )
return repr_str
# NOTE: making this return a value that can be passed to # NOTE: making this return a value that can be passed to
# `eval()` is entirely **optional** FYI! # `eval()` is entirely **optional** FYI!

View File

@ -289,7 +289,7 @@ async def maybe_wait_on_canced_subs(
# #
# -[x] maybe change to mod-func and rename for implied # -[x] maybe change to mod-func and rename for implied
# multi-transport semantics? # multi-transport semantics?
# -[ ] register each stream/tpt/chan with the owning `Endpoint` # -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
# so that we can query per tpt all peer contact infos? # so that we can query per tpt all peer contact infos?
# |_[ ] possibly provide a global viewing via a # |_[ ] possibly provide a global viewing via a
# `collections.ChainMap`? # `collections.ChainMap`?
@ -309,7 +309,7 @@ async def handle_stream_from_peer(
any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery` any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery`
such that it is invoked as, such that it is invoked as,
Endpoint.stream_handler_tn.start_soon( IPCEndpoint.stream_handler_tn.start_soon(
handle_stream, handle_stream,
stream, stream,
) )
@ -577,7 +577,7 @@ async def handle_stream_from_peer(
# finally block closure # finally block closure
class Endpoint(Struct): class IPCEndpoint(Struct):
''' '''
An instance of an IPC "bound" address where the lifetime of the An instance of an IPC "bound" address where the lifetime of the
"ability to accept connections" (from clients) and then handle "ability to accept connections" (from clients) and then handle
@ -636,7 +636,7 @@ class Endpoint(Struct):
) )
class Server(Struct): class IPCServer(Struct):
_parent_tn: Nursery _parent_tn: Nursery
_stream_handler_tn: Nursery _stream_handler_tn: Nursery
# level-triggered sig for whether "no peers are currently # level-triggered sig for whether "no peers are currently
@ -644,7 +644,7 @@ class Server(Struct):
# initialized with `.is_set() == True`. # initialized with `.is_set() == True`.
_no_more_peers: trio.Event _no_more_peers: trio.Event
_endpoints: list[Endpoint] = [] _endpoints: list[IPCEndpoint] = []
# connection tracking & mgmt # connection tracking & mgmt
_peers: defaultdict[ _peers: defaultdict[
@ -659,10 +659,10 @@ class Server(Struct):
# syncs for setup/teardown sequences # syncs for setup/teardown sequences
_shutdown: trio.Event|None = None _shutdown: trio.Event|None = None
# TODO, maybe just make `._endpoints: list[Endpoint]` and # TODO, maybe just make `._endpoints: list[IPCEndpoint]` and
# provide dict-views onto it? # provide dict-views onto it?
# @property # @property
# def addrs2eps(self) -> dict[Address, Endpoint]: # def addrs2eps(self) -> dict[Address, IPCEndpoint]:
# ... # ...
@property @property
@ -708,7 +708,7 @@ class Server(Struct):
await self._shutdown.wait() await self._shutdown.wait()
else: else:
tpt_protos: list[str] = [] tpt_protos: list[str] = []
ep: Endpoint ep: IPCEndpoint
for ep in self._endpoints: for ep in self._endpoints:
tpt_protos.append(ep.addr.proto_key) tpt_protos.append(ep.addr.proto_key)
@ -790,7 +790,7 @@ class Server(Struct):
def epsdict(self) -> dict[ def epsdict(self) -> dict[
Address, Address,
Endpoint, IPCEndpoint,
]: ]:
return { return {
ep.addr: ep ep.addr: ep
@ -804,7 +804,7 @@ class Server(Struct):
return ev.is_set() return ev.is_set()
def pformat(self) -> str: def pformat(self) -> str:
eps: list[Endpoint] = self._endpoints eps: list[IPCEndpoint] = self._endpoints
state_repr: str = ( state_repr: str = (
f'{len(eps)!r} IPC-endpoints active' f'{len(eps)!r} IPC-endpoints active'
@ -835,13 +835,13 @@ class Server(Struct):
# TODO? maybe allow shutting down a `.listen_on()`s worth of # TODO? maybe allow shutting down a `.listen_on()`s worth of
# listeners by cancelling the corresponding # listeners by cancelling the corresponding
# `Endpoint._listen_tn` only ? # `IPCEndpoint._listen_tn` only ?
# -[ ] in theory you could use this to # -[ ] in theory you could use this to
# "boot-and-wait-for-reconnect" of all current and connecting # "boot-and-wait-for-reconnect" of all current and connecting
# peers? # peers?
# |_ would require that the stream-handler is intercepted so we # |_ would require that the stream-handler is intercepted so we
# can intercept every `MsgTransport` (stream) and track per # can intercept every `MsgTransport` (stream) and track per
# `Endpoint` likely? # `IPCEndpoint` likely?
# #
# async def unlisten( # async def unlisten(
# self, # self,
@ -854,7 +854,7 @@ class Server(Struct):
*, *,
accept_addrs: list[tuple[str, int|str]]|None = None, accept_addrs: list[tuple[str, int|str]]|None = None,
stream_handler_nursery: Nursery|None = None, stream_handler_nursery: Nursery|None = None,
) -> list[Endpoint]: ) -> list[IPCEndpoint]:
''' '''
Start `SocketListeners` (i.e. bind and call `socket.listen()`) Start `SocketListeners` (i.e. bind and call `socket.listen()`)
for all IPC-transport-protocol specific `Address`-types for all IPC-transport-protocol specific `Address`-types
@ -888,7 +888,7 @@ class Server(Struct):
f'Binding to endpoints for,\n' f'Binding to endpoints for,\n'
f'{accept_addrs}\n' f'{accept_addrs}\n'
) )
eps: list[Endpoint] = await self._parent_tn.start( eps: list[IPCEndpoint] = await self._parent_tn.start(
partial( partial(
_serve_ipc_eps, _serve_ipc_eps,
server=self, server=self,
@ -904,7 +904,7 @@ class Server(Struct):
self._endpoints.extend(eps) self._endpoints.extend(eps)
# XXX, just a little bit of sanity # XXX, just a little bit of sanity
group_tn: Nursery|None = None group_tn: Nursery|None = None
ep: Endpoint ep: IPCEndpoint
for ep in eps: for ep in eps:
if ep.addr not in self.addrs: if ep.addr not in self.addrs:
breakpoint() breakpoint()
@ -917,10 +917,6 @@ class Server(Struct):
return eps return eps
# alias until we decide on final naming
IPCServer = Server
async def _serve_ipc_eps( async def _serve_ipc_eps(
*, *,
server: IPCServer, server: IPCServer,
@ -945,12 +941,12 @@ async def _serve_ipc_eps(
listen_tn: Nursery listen_tn: Nursery
async with trio.open_nursery() as listen_tn: async with trio.open_nursery() as listen_tn:
eps: list[Endpoint] = [] eps: list[IPCEndpoint] = []
# XXX NOTE, required to call `serve_listeners()` below. # XXX NOTE, required to call `serve_listeners()` below.
# ?TODO, maybe just pass `list(eps.values()` tho? # ?TODO, maybe just pass `list(eps.values()` tho?
listeners: list[trio.abc.Listener] = [] listeners: list[trio.abc.Listener] = []
for addr in listen_addrs: for addr in listen_addrs:
ep = Endpoint( ep = IPCEndpoint(
addr=addr, addr=addr,
listen_tn=listen_tn, listen_tn=listen_tn,
stream_handler_tn=stream_handler_tn, stream_handler_tn=stream_handler_tn,
@ -1014,7 +1010,7 @@ async def _serve_ipc_eps(
finally: finally:
if eps: if eps:
addr: Address addr: Address
ep: Endpoint ep: IPCEndpoint
for addr, ep in server.epsdict().items(): for addr, ep in server.epsdict().items():
ep.close_listener() ep.close_listener()
server._endpoints.remove(ep) server._endpoints.remove(ep)

View File

@ -20,7 +20,6 @@ Prettified version of `msgspec.Struct` for easier console grokin.
''' '''
from __future__ import annotations from __future__ import annotations
from collections import UserList from collections import UserList
import textwrap
from typing import ( from typing import (
Any, Any,
Iterator, Iterator,
@ -106,11 +105,27 @@ def iter_fields(struct: Struct) -> Iterator[
) )
def iter_struct_ppfmt_lines( def pformat(
struct: Struct, struct: Struct,
field_indent: int = 0, field_indent: int = 2,
) -> Iterator[tuple[str, str]]: indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + struct.__class__.__qualname__
qtn: str = struct.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo fi: structs.FieldInfo
k: str k: str
v: Any v: Any
@ -120,18 +135,15 @@ def iter_struct_ppfmt_lines(
# ..]` over .__name__ == `Literal` but still get only the # ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..? # latter for simple types like `str | int | None` etc..?
ft: type = fi.type ft: type = fi.type
typ_name: str = getattr( typ_name: str = getattr(ft, '__name__', str(ft))
ft,
'__name__',
str(ft)
).replace(' ', '')
# recurse to get sub-struct's `.pformat()` output Bo # recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct): if isinstance(v, Struct):
yield from iter_struct_ppfmt_lines( val_str: str = v.pformat(
struct=v, indent=field_indent + indent,
field_indent=field_indent+field_indent, field_indent=indent + field_indent,
) )
else: else:
val_str: str = repr(v) val_str: str = repr(v)
@ -149,39 +161,8 @@ def iter_struct_ppfmt_lines(
# raise # raise
# return _Struct.__repr__(struct) # return _Struct.__repr__(struct)
yield ( # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
' '*field_indent, # indented ws prefix obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
f'{k}: {typ_name} = {val_str},', # field's repr line content
)
def pformat(
struct: Struct,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
obj_str: str = '' # accumulator
for prefix, field_repr, in iter_struct_ppfmt_lines(
struct,
field_indent=field_indent,
):
obj_str += f'{prefix}{field_repr}\n'
# global whitespace indent
ws: str = ' '*indent
if indent:
obj_str: str = textwrap.indent(
text=obj_str,
prefix=ws,
)
# qtn: str = ws + struct.__class__.__qualname__
qtn: str = struct.__class__.__qualname__
return ( return (
f'{qtn}(\n' f'{qtn}(\n'

View File

@ -154,29 +154,6 @@ class Aid(
# should also include at least `.pid` (equiv to port for tcp) # should also include at least `.pid` (equiv to port for tcp)
# and/or host-part always? # and/or host-part always?
@property
def uid(self) -> tuple[str, str]:
'''
Legacy actor "unique-id" pair format.
'''
return (
self.name,
self.uuid,
)
def reprol(
self,
sin_uuid: bool = True,
) -> str:
if not sin_uuid:
return (
f'{self.name}[{self.uuid[:6]}]@{self.pid!r}'
)
return (
f'{self.name}@{self.pid!r}'
)
class SpawnSpec( class SpawnSpec(
pretty_struct.Struct, pretty_struct.Struct,