Compare commits

..

6 Commits

Author SHA1 Message Date
Tyler Goodlet 49c61e40c7 Refine `Actor` status iface, use `Aid` throughout
To simplify `.pformat()` output when the new `privates: bool` is unset
(the default) this adds new public attrs to wrap an actor's
cancellation status as well as provide a `.repr_state: str` (similar to
our equiv on `Context`). Rework `.pformat()` to render a much simplified
repr using all these new refinements.

Further, port the `.cancel()` method to use `.msg.types.Aid` for all
internal `requesting_uid` refs (now renamed with `_aid`) and in all
called downstream methods.

New cancel-state iface deats,
- rename `._cancel_called_by_remote` -> `._cancel_called_by` and expect
  it to be set as an `Aid`.
- add `.cancel_complete: bool` which flags whether `.cancel()` ran to
  completion.
- add `.cancel_called: bool` which just wraps `._cancel_called` (and
  which likely will just be dropped since we already have
  `._cancel_called_by`).
- add `.cancel_caller: Aid|None` which wraps `._cancel_called_by`.

In terms of using `Aid` in cancel methods,
- rename vars with `_aid` suffix in `.cancel()` (and wherever else).
- change `.cancel_rpc_tasks()` input param to `req_aid: msgtypes.Aid`.
- do the same for `._cancel_task()` and (for now until we adjust its
  internals as well) use the `Aid.uid` remap property when assigning
  `Context._canceller`.
- adjust all log msg refs to match obvi.
2025-06-23 19:36:11 -04:00
Tyler Goodlet 25f3cf795d Add flag to toggle private vars in `Channel.pformat()`
Call it `privates: bool` and only show certain internal instance vars
when set in the `repr()` output.
2025-06-23 12:08:05 -04:00
Tyler Goodlet e2b7924898 Refactor `pretty_struct.pformat()` rendering
Adding an underlying `iter_struct_ppfmt_lines()` (which can also be
called directly) to iter-render field-lines-pairs; it's now called from
the top level `.pformat()`. The use case is to allow more granular
control for rendering runtime primitive (like `Actor.pformat()`) reprs
depending on log-level/config, oh and using `textwrap` for indenting.
2025-06-22 22:09:37 -04:00
Tyler Goodlet c559f80f08 Extend `.msg.types.Aid` method interface
Providing the legacy `.uid -> tuple` style id (since still used for the
`Actor._contexts` table) and a `repr-one-line` method `.reprol() -> str`
for rendering a compact unique actor ID summary (useful in
logging/.pformat()s at the least).
2025-06-22 21:55:37 -04:00
Tyler Goodlet 6a6f55cee0 Mv in `modden.repr` content, refine `nest_from_op()`
Since I'd like to use some `reprlib` formatting which `modden` already
implemented (and it's a main dependee project), figured I'd just bring
it all into `.devx.pformat` for now.

Also some more tweaks to `nest_from_op()` namely for correctness and
some additional paarams,
- an explicit `op_suffix: str = '\n'` (instead of always assuming
  `f'{input_op}\n'`).
- add `prefix_op: bool` so that, when unset, the `input_op` is instead
  used as a suffix to the first line of `text`.
- default `next_indent = None` such that when set (and not null) we use
  that exact ws-indent instead of calculating it from the
  `len(nest_prefix)` allowing for specifying a `0`-indent easily.
2025-06-22 20:57:51 -04:00
Tyler Goodlet bff32b0ad7 Drop 'IPC' prefix from `._server` types
We already have the `.ipc` sub-pkg name so it seems a bit
redundant/noisy for a namespace path Bp

Leave an alias for the `Server` rn since it's already used in a few
other internal mods.. will likely rename later if everyone is cool with
it..
2025-06-17 23:33:58 -04:00
7 changed files with 394 additions and 146 deletions

View File

@ -49,7 +49,7 @@ def test_basic_ipc_server(
)
assert server._no_more_peers.is_set()
eps: list[ipc.IPCEndpoint] = await server.listen_on(
eps: list[ipc._server.Endpoint] = await server.listen_on(
accept_addrs=[rando_addr],
stream_handler_nursery=None,
)

View File

@ -234,7 +234,7 @@ class Actor:
# state
self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple]|None = None
self._cancel_called_by: tuple[str, tuple]|None = None
self._cancel_called: bool = False
# retreive and store parent `__main__` data which
@ -346,69 +346,118 @@ class Actor:
def pid(self) -> int:
return self._aid.pid
@property
def repr_state(self) -> str:
if self.cancel_complete:
return 'cancelled'
elif canceller := self.cancel_caller:
return f' and cancel-called by {canceller}'
else:
return 'running'
def pformat(
self,
ds: str = ': ',
indent: int = 0,
privates: bool = False,
) -> str:
fields_sect_prefix: str = ' |_'
parent_uid: tuple|None = None
fmtstr: str = f'|_id: {self.aid.reprol()!r}\n'
if privates:
aid_nest_prefix: str = '|_aid='
aid_field_repr: str = _pformat.nest_from_op(
input_op='',
text=pretty_struct.pformat(
struct=self.aid,
field_indent=2,
),
op_suffix='',
nest_prefix=aid_nest_prefix,
nest_indent=0,
)
fmtstr: str = f'{aid_field_repr}'
if rent_chan := self._parent_chan:
parent_uid = rent_chan.uid
fmtstr += (
f"|_parent{ds}{rent_chan.aid.reprol()}\n"
)
peers: list = []
server: _server.IPCServer = self.ipc_server
ipc_server_sect: str = ''
if server:
peers: list[tuple] = list(server._peer_connected)
if privates:
server_repr: str = self._ipc_server.pformat(
privates=privates,
)
# create field ln as a key-header indented under
# and up to the section's key prefix.
# field_ln_header: str = textwrap.indent(
# text=f"ipc_server{ds}",
# prefix=' '*len(fields_sect_prefix),
# )
# ^XXX if we were to indent `repr(Server)` to
# '<key>: '
# _here_^
server_repr: str = textwrap.indent(
text=self._ipc_server.pformat(),
# prefix=' '*len(field_ln_header),
prefix=' '*len(fields_sect_prefix),
server_repr: str = _pformat.nest_from_op(
input_op='', # nest as sub-obj
op_suffix='',
text=server_repr,
)
ipc_server_sect: str = (
# f'{field_ln_header}\n'
f'{server_repr}'
fmtstr += (
f"{server_repr}"
)
else:
fmtstr += (
f'|_ipc: {server.repr_state!r}\n'
)
fmtstr: str = (
f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n"
f" parent{ds}{parent_uid}\n"
# f'\n'
f' |_ipc: {len(peers)!r} connected peers\n'
f" peers{ds}{peers!r}\n"
f"{ipc_server_sect}"
# f'\n'
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
f" ctxs{ds}{len(self._contexts)}\n"
# f'\n'
f' |_runtime: ._task{ds}{self._task!r}\n'
f' _spawn_method{ds}{self._spawn_method}\n'
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' _forkserver_info{ds}{self._forkserver_info}\n'
# f'\n'
f' |_state: "TODO: .repr_state()"\n'
f' _cancel_complete{ds}{self._cancel_complete}\n'
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
f' _cancel_called{ds}{self._cancel_called}\n'
fmtstr += (
f'|_rpc: {len(self._rpc_tasks)} active tasks\n'
)
# TODO, actually fix the .repr_state impl/output?
# append ipc-ctx state summary
# ctxs: dict = self._contexts
# if ctxs:
# ctx_states: dict[str, int] = {}
# for ctx in self._contexts.values():
# ctx_state: str = ctx.repr_state
# cnt = ctx_states.setdefault(ctx_state, 0)
# ctx_states[ctx_state] = cnt + 1
# fmtstr += (
# f" ctxs{ds}{ctx_states}\n"
# )
# runtime-state
task_name: str = '<dne>'
if task := self._task:
task_name: str = task.name
fmtstr += (
# TODO, this just like ctx?
f'|_state: {self.repr_state!r}\n'
f' task: {task_name}\n'
f' loglevel: {self.loglevel!r}\n'
f' subactors_spawned: {len(self._actoruid2nursery)}\n'
)
if not _state.is_root_process():
fmtstr += f' spawn_method: {self._spawn_method!r}\n'
if privates:
fmtstr += (
# f' actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' cancel_complete{ds}{self._cancel_complete}\n'
f' cancel_called_by_remote{ds}{self._cancel_called_by}\n'
f' cancel_called{ds}{self._cancel_called}\n'
)
if fmtstr:
fmtstr: str = textwrap.indent(
text=fmtstr,
prefix=' '*(1 + indent),
)
_repr: str = (
'<Actor(\n'
+
fmtstr
+
')>\n'
f'<{type(self).__name__}(\n'
f'{fmtstr}'
f')>\n'
)
if indent:
_repr: str = textwrap.indent(
@ -533,11 +582,11 @@ class Actor:
queue.
'''
uid: tuple[str, str] = chan.uid
assert uid, f"`chan.uid` can't be {uid}"
aid: msgtypes.Aid = chan.aid
assert aid, f"`chan.aid` can't be {aid}"
try:
ctx: Context = self._contexts[(
uid,
aid.uid,
cid,
# TODO: how to determine this tho?
@ -548,7 +597,7 @@ class Actor:
'Ignoring invalid IPC msg!?\n'
f'Ctx seems to not/no-longer exist??\n'
f'\n'
f'<=? {uid}\n'
f'<=? {aid.reprol()!r}\n'
f' |_{pretty_struct.pformat(msg)}\n'
)
match msg:
@ -597,6 +646,7 @@ class Actor:
msging session's lifetime.
'''
# ?TODO, use Aid here as well?
actor_uid = chan.uid
assert actor_uid
try:
@ -945,6 +995,22 @@ class Actor:
None, # self cancel all rpc tasks
)
@property
def cancel_complete(self) -> bool:
return self._cancel_complete.is_set()
@property
def cancel_called(self) -> bool:
'''
Was this actor requested to cancel by a remote peer actor.
'''
return self._cancel_called_by is not None
@property
def cancel_caller(self) -> msgtypes.Aid|None:
return self._cancel_called_by
async def cancel(
self,
@ -969,20 +1035,18 @@ class Actor:
'''
(
requesting_uid,
requester_type,
requesting_aid, # Aid
requester_type, # str
req_chan,
log_meth,
) = (
req_chan.uid,
req_chan.aid,
'peer',
req_chan,
log.cancel,
) if req_chan else (
# a self cancel of ALL rpc tasks
self.uid,
self.aid,
'self',
self,
log.runtime,
@ -990,14 +1054,14 @@ class Actor:
# TODO: just use the new `Context.repr_rpc: str` (and
# other) repr fields instead of doing this all manual..
msg: str = (
f'Actor-runtime cancel request from {requester_type}\n\n'
f'<=c) {requesting_uid}\n'
f' |_{self}\n'
f'Actor-runtime cancel request from {requester_type!r}\n'
f'\n'
f'<=c)\n'
f'{self}'
)
# TODO: what happens here when we self-cancel tho?
self._cancel_called_by_remote: tuple = requesting_uid
self._cancel_called_by: tuple = requesting_aid
self._cancel_called = True
# cancel all ongoing rpc tasks
@ -1025,7 +1089,7 @@ class Actor:
# self-cancel **all** ongoing RPC tasks
await self.cancel_rpc_tasks(
req_uid=requesting_uid,
req_aid=requesting_aid,
parent_chan=None,
)
@ -1054,8 +1118,7 @@ class Actor:
self,
cid: str,
parent_chan: Channel,
requesting_uid: tuple[str, str]|None,
# ^^TODO! use the `Aid` directly here!
requesting_aid: msgtypes.Aid|None,
ipc_msg: dict|None|bool = False,
@ -1093,7 +1156,7 @@ class Actor:
log.runtime(
'Cancel request for invalid RPC task.\n'
'The task likely already completed or was never started!\n\n'
f'<= canceller: {requesting_uid}\n'
f'<= canceller: {requesting_aid}\n'
f'=> {cid}@{parent_chan.uid}\n'
f' |_{parent_chan}\n'
)
@ -1101,7 +1164,7 @@ class Actor:
log.cancel(
'Rxed cancel request for RPC task\n'
f'{ctx._task!r} <=c) {requesting_uid}\n'
f'{ctx._task!r} <=c) {requesting_aid}\n'
f'|_>> {ctx.repr_rpc}\n'
# f'|_{ctx._task}\n'
@ -1127,9 +1190,9 @@ class Actor:
)
if (
ctx._canceller is None
and requesting_uid
and requesting_aid
):
ctx._canceller: tuple = requesting_uid
ctx._canceller: tuple = requesting_aid.uid
# TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and
# then raise and pack it here?
@ -1155,7 +1218,7 @@ class Actor:
# wait for _invoke to mark the task complete
flow_info: str = (
f'<= canceller: {requesting_uid}\n'
f'<= canceller: {requesting_aid}\n'
f'=> ipc-parent: {parent_chan}\n'
f'|_{ctx}\n'
)
@ -1172,7 +1235,7 @@ class Actor:
async def cancel_rpc_tasks(
self,
req_uid: tuple[str, str],
req_aid: msgtypes.Aid,
# NOTE: when None is passed we cancel **all** rpc
# tasks running in this actor!
@ -1189,7 +1252,7 @@ class Actor:
if not tasks:
log.runtime(
'Actor has no cancellable RPC tasks?\n'
f'<= canceller: {req_uid}\n'
f'<= canceller: {req_aid.reprol()}\n'
)
return
@ -1229,7 +1292,7 @@ class Actor:
)
log.cancel(
f'Cancelling {descr} RPC tasks\n\n'
f'<=c) {req_uid} [canceller]\n'
f'<=c) {req_aid} [canceller]\n'
f'{rent_chan_repr}'
f'c)=> {self.uid} [cancellee]\n'
f' |_{self} [with {len(tasks)} tasks]\n'
@ -1257,7 +1320,7 @@ class Actor:
await self._cancel_task(
cid,
task_caller_chan,
requesting_uid=req_uid,
requesting_aid=req_aid,
)
if tasks:
@ -1554,8 +1617,9 @@ async def async_main(
# 'Blocking on service nursery to exit..\n'
)
log.runtime(
"Service nursery complete\n"
"Waiting on root nursery to complete"
'Service nursery complete\n'
'\n'
'-> Waiting on root nursery to complete'
)
# Blocks here as expected until the root nursery is
@ -1705,7 +1769,7 @@ async def async_main(
)
teardown_report += (
'Actor runtime exited\n'
f'{op_nested_actor_repr}\n'
f'{op_nested_actor_repr}'
)
log.info(teardown_report)

View File

@ -15,8 +15,10 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Pretty formatters for use throughout the code base.
Mostly handy for logging and exception message content.
Pretty formatters for use throughout our internals.
Handy for logging and exception message content but also for `repr()`
in REPL(s).
'''
import sys
@ -224,8 +226,8 @@ def pformat_cs(
field_prefix: str = ' |_',
) -> str:
'''
Pretty format info about a `trio.CancelScope` including most
of its public state and `._cancel_status`.
Pretty format info about a `trio.CancelScope` including most of
its public state and `._cancel_status`.
The output can be modified to show a "var name" for the
instance as a field prefix, just a simple str before each
@ -252,9 +254,12 @@ def pformat_cs(
def nest_from_op(
input_op: str, # TODO, Literal of all op-"symbols" from below?
text: str,
prefix_op: bool = True, # unset is to suffix the first line
# optionally suffix `text`, by def on a newline
op_suffix='\n',
nest_prefix: str = '|_',
nest_indent: int = 0,
nest_indent: int|None = None,
# XXX indent `next_prefix` "to-the-right-of" `input_op`
# by this count of whitespaces (' ').
@ -348,11 +353,15 @@ def nest_from_op(
prefix=nest_indent*' ',
)
indented_tree_str: str = text
tree_str_indent: int = 0
if nest_indent != 0:
tree_str_indent: int = len(nest_prefix)
indented_tree_str: str = textwrap.indent(
text,
prefix=' '*tree_str_indent
)
# inject any provided nesting-prefix chars
# into the head of the first line.
if nest_prefix:
@ -360,7 +369,126 @@ def nest_from_op(
f'{nest_prefix}{indented_tree_str[tree_str_indent:]}'
)
if prefix_op:
return (
f'{input_op}\n'
f'{input_op}{op_suffix}'
f'{indented_tree_str}'
)
else:
tree_lns: list[str] = indented_tree_str.splitlines()
first: str = tree_lns[0]
rest: str = '\n'.join(tree_lns[1:])
return (
f'{first}{input_op}{op_suffix}'
f'{rest}'
)
# ------ modden.repr ------
# XXX originally taken verbaatim from `modden.repr`
'''
More "multi-line" representation then the stdlib's `pprint` equivs.
'''
from inspect import (
FrameInfo,
stack,
)
import pprint
import reprlib
from typing import (
Callable,
)
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=3, # indent used for repr of recursive objects
maxlevel=616, # recursion levels
maxdict=616, # max items shown for `dict`
maxlist=616, # max items shown for `dict`
maxstring=616, # match editor line-len limit
maxtuple=616, # match editor line-len limit
maxother=616, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr
def ppfmt(
obj: object,
do_print: bool = False,
) -> str:
'''
The `pprint.pformat()` version of `pprint.pp()`, namely
a default `sort_dicts=False`.. (which i think should be
the normal default in the stdlib).
'''
pprepr: Callable = mk_repr()
repr_str: str = pprepr(obj)
if do_print:
return pprint.pp(repr_str)
return repr_str
pformat = ppfmt
def pfmt_frame_info(fi: FrameInfo) -> str:
'''
Like a std `inspect.FrameInfo.__repr__()` but multi-line..
'''
return (
'FrameInfo(\n'
' frame={!r},\n'
' filename={!r},\n'
' lineno={!r},\n'
' function={!r},\n'
' code_context={!r},\n'
' index={!r},\n'
' positions={!r})'
).format(
fi.frame,
fi.filename,
fi.lineno,
fi.function,
fi.code_context,
fi.index,
fi.positions
)
def pfmt_callstack(frames: int = 1) -> str:
'''
Generate a string of nested `inspect.FrameInfo` objects returned
from a `inspect.stack()` call such that only the `.frame` field
for each layer is pprinted.
'''
caller_frames: list[FrameInfo] = stack()[1:1+frames]
frames_str: str = ''
for i, frame_info in enumerate(caller_frames):
frames_str += textwrap.indent(
f'{frame_info.frame!r}\n',
prefix=' '*i,
)
return frames_str

View File

@ -196,9 +196,12 @@ class Channel:
self._transport.codec = orig
# TODO: do a .src/.dst: str for maddrs?
def pformat(self) -> str:
def pformat(
self,
privates: bool = False,
) -> str:
if not self._transport:
return '<Channel with inactive transport?>'
return '<Channel( with inactive transport? )>'
tpt: MsgTransport = self._transport
tpt_name: str = type(tpt).__name__
@ -206,14 +209,17 @@ class Channel:
'connected' if self.connected()
else 'closed'
)
return (
repr_str: str = (
f'<Channel(\n'
f' |_status: {tpt_status!r}\n'
) + (
f' _closed={self._closed}\n'
f' _cancel_called={self._cancel_called}\n'
f'\n'
f' |_peer: {self.aid}\n'
f'\n'
if privates else ''
) + ( # peer-actor (processs) section
f' |_peer: {self.aid.reprol()!r}\n'
if self.aid else '<unknown>'
) + (
f' |_msgstream: {tpt_name}\n'
f' proto={tpt.laddr.proto_key!r}\n'
f' layer={tpt.layer_key!r}\n'
@ -223,9 +229,13 @@ class Channel:
f' stream={tpt.stream}\n'
f' maddr={tpt.maddr!r}\n'
f' drained={tpt.drained}\n'
) + (
f' _send_lock={tpt._send_lock.statistics()}\n'
f')>\n'
if privates else ''
) + (
')>\n'
)
return repr_str
# NOTE: making this return a value that can be passed to
# `eval()` is entirely **optional** FYI!

View File

@ -289,7 +289,7 @@ async def maybe_wait_on_canced_subs(
#
# -[x] maybe change to mod-func and rename for implied
# multi-transport semantics?
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
# -[ ] register each stream/tpt/chan with the owning `Endpoint`
# so that we can query per tpt all peer contact infos?
# |_[ ] possibly provide a global viewing via a
# `collections.ChainMap`?
@ -309,7 +309,7 @@ async def handle_stream_from_peer(
any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery`
such that it is invoked as,
IPCEndpoint.stream_handler_tn.start_soon(
Endpoint.stream_handler_tn.start_soon(
handle_stream,
stream,
)
@ -577,7 +577,7 @@ async def handle_stream_from_peer(
# finally block closure
class IPCEndpoint(Struct):
class Endpoint(Struct):
'''
An instance of an IPC "bound" address where the lifetime of the
"ability to accept connections" (from clients) and then handle
@ -636,7 +636,7 @@ class IPCEndpoint(Struct):
)
class IPCServer(Struct):
class Server(Struct):
_parent_tn: Nursery
_stream_handler_tn: Nursery
# level-triggered sig for whether "no peers are currently
@ -644,7 +644,7 @@ class IPCServer(Struct):
# initialized with `.is_set() == True`.
_no_more_peers: trio.Event
_endpoints: list[IPCEndpoint] = []
_endpoints: list[Endpoint] = []
# connection tracking & mgmt
_peers: defaultdict[
@ -659,10 +659,10 @@ class IPCServer(Struct):
# syncs for setup/teardown sequences
_shutdown: trio.Event|None = None
# TODO, maybe just make `._endpoints: list[IPCEndpoint]` and
# TODO, maybe just make `._endpoints: list[Endpoint]` and
# provide dict-views onto it?
# @property
# def addrs2eps(self) -> dict[Address, IPCEndpoint]:
# def addrs2eps(self) -> dict[Address, Endpoint]:
# ...
@property
@ -708,7 +708,7 @@ class IPCServer(Struct):
await self._shutdown.wait()
else:
tpt_protos: list[str] = []
ep: IPCEndpoint
ep: Endpoint
for ep in self._endpoints:
tpt_protos.append(ep.addr.proto_key)
@ -790,7 +790,7 @@ class IPCServer(Struct):
def epsdict(self) -> dict[
Address,
IPCEndpoint,
Endpoint,
]:
return {
ep.addr: ep
@ -804,7 +804,7 @@ class IPCServer(Struct):
return ev.is_set()
def pformat(self) -> str:
eps: list[IPCEndpoint] = self._endpoints
eps: list[Endpoint] = self._endpoints
state_repr: str = (
f'{len(eps)!r} IPC-endpoints active'
@ -835,13 +835,13 @@ class IPCServer(Struct):
# TODO? maybe allow shutting down a `.listen_on()`s worth of
# listeners by cancelling the corresponding
# `IPCEndpoint._listen_tn` only ?
# `Endpoint._listen_tn` only ?
# -[ ] in theory you could use this to
# "boot-and-wait-for-reconnect" of all current and connecting
# peers?
# |_ would require that the stream-handler is intercepted so we
# can intercept every `MsgTransport` (stream) and track per
# `IPCEndpoint` likely?
# `Endpoint` likely?
#
# async def unlisten(
# self,
@ -854,7 +854,7 @@ class IPCServer(Struct):
*,
accept_addrs: list[tuple[str, int|str]]|None = None,
stream_handler_nursery: Nursery|None = None,
) -> list[IPCEndpoint]:
) -> list[Endpoint]:
'''
Start `SocketListeners` (i.e. bind and call `socket.listen()`)
for all IPC-transport-protocol specific `Address`-types
@ -888,7 +888,7 @@ class IPCServer(Struct):
f'Binding to endpoints for,\n'
f'{accept_addrs}\n'
)
eps: list[IPCEndpoint] = await self._parent_tn.start(
eps: list[Endpoint] = await self._parent_tn.start(
partial(
_serve_ipc_eps,
server=self,
@ -904,7 +904,7 @@ class IPCServer(Struct):
self._endpoints.extend(eps)
# XXX, just a little bit of sanity
group_tn: Nursery|None = None
ep: IPCEndpoint
ep: Endpoint
for ep in eps:
if ep.addr not in self.addrs:
breakpoint()
@ -917,6 +917,10 @@ class IPCServer(Struct):
return eps
# alias until we decide on final naming
IPCServer = Server
async def _serve_ipc_eps(
*,
server: IPCServer,
@ -941,12 +945,12 @@ async def _serve_ipc_eps(
listen_tn: Nursery
async with trio.open_nursery() as listen_tn:
eps: list[IPCEndpoint] = []
eps: list[Endpoint] = []
# XXX NOTE, required to call `serve_listeners()` below.
# ?TODO, maybe just pass `list(eps.values()` tho?
listeners: list[trio.abc.Listener] = []
for addr in listen_addrs:
ep = IPCEndpoint(
ep = Endpoint(
addr=addr,
listen_tn=listen_tn,
stream_handler_tn=stream_handler_tn,
@ -1010,7 +1014,7 @@ async def _serve_ipc_eps(
finally:
if eps:
addr: Address
ep: IPCEndpoint
ep: Endpoint
for addr, ep in server.epsdict().items():
ep.close_listener()
server._endpoints.remove(ep)

View File

@ -20,6 +20,7 @@ Prettified version of `msgspec.Struct` for easier console grokin.
'''
from __future__ import annotations
from collections import UserList
import textwrap
from typing import (
Any,
Iterator,
@ -105,27 +106,11 @@ def iter_fields(struct: Struct) -> Iterator[
)
def pformat(
def iter_struct_ppfmt_lines(
struct: Struct,
field_indent: int = 2,
indent: int = 0,
field_indent: int = 0,
) -> Iterator[tuple[str, str]]:
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + struct.__class__.__qualname__
qtn: str = struct.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
@ -135,15 +120,18 @@ def pformat(
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
typ_name: str = getattr(
ft,
'__name__',
str(ft)
).replace(' ', '')
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
yield from iter_struct_ppfmt_lines(
struct=v,
field_indent=field_indent+field_indent,
)
else:
val_str: str = repr(v)
@ -161,8 +149,39 @@ def pformat(
# raise
# return _Struct.__repr__(struct)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
yield (
' '*field_indent, # indented ws prefix
f'{k}: {typ_name} = {val_str},', # field's repr line content
)
def pformat(
struct: Struct,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
obj_str: str = '' # accumulator
for prefix, field_repr, in iter_struct_ppfmt_lines(
struct,
field_indent=field_indent,
):
obj_str += f'{prefix}{field_repr}\n'
# global whitespace indent
ws: str = ' '*indent
if indent:
obj_str: str = textwrap.indent(
text=obj_str,
prefix=ws,
)
# qtn: str = ws + struct.__class__.__qualname__
qtn: str = struct.__class__.__qualname__
return (
f'{qtn}(\n'

View File

@ -154,6 +154,29 @@ class Aid(
# should also include at least `.pid` (equiv to port for tcp)
# and/or host-part always?
@property
def uid(self) -> tuple[str, str]:
'''
Legacy actor "unique-id" pair format.
'''
return (
self.name,
self.uuid,
)
def reprol(
self,
sin_uuid: bool = True,
) -> str:
if not sin_uuid:
return (
f'{self.name}[{self.uuid[:6]}]@{self.pid!r}'
)
return (
f'{self.name}@{self.pid!r}'
)
class SpawnSpec(
pretty_struct.Struct,