Compare commits

..

6 Commits

Author SHA1 Message Date
Tyler Goodlet 49c61e40c7 Refine `Actor` status iface, use `Aid` throughout
To simplify `.pformat()` output when the new `privates: bool` is unset
(the default) this adds new public attrs to wrap an actor's
cancellation status as well as provide a `.repr_state: str` (similar to
our equiv on `Context`). Rework `.pformat()` to render a much simplified
repr using all these new refinements.

Further, port the `.cancel()` method to use `.msg.types.Aid` for all
internal `requesting_uid` refs (now renamed with `_aid`) and in all
called downstream methods.

New cancel-state iface deats,
- rename `._cancel_called_by_remote` -> `._cancel_called_by` and expect
  it to be set as an `Aid`.
- add `.cancel_complete: bool` which flags whether `.cancel()` ran to
  completion.
- add `.cancel_called: bool` which just wraps `._cancel_called` (and
  which likely will just be dropped since we already have
  `._cancel_called_by`).
- add `.cancel_caller: Aid|None` which wraps `._cancel_called_by`.

In terms of using `Aid` in cancel methods,
- rename vars with `_aid` suffix in `.cancel()` (and wherever else).
- change `.cancel_rpc_tasks()` input param to `req_aid: msgtypes.Aid`.
- do the same for `._cancel_task()` and (for now until we adjust its
  internals as well) use the `Aid.uid` remap property when assigning
  `Context._canceller`.
- adjust all log msg refs to match obvi.
2025-06-23 19:36:11 -04:00
Tyler Goodlet 25f3cf795d Add flag to toggle private vars in `Channel.pformat()`
Call it `privates: bool` and only show certain internal instance vars
when set in the `repr()` output.
2025-06-23 12:08:05 -04:00
Tyler Goodlet e2b7924898 Refactor `pretty_struct.pformat()` rendering
Adding an underlying `iter_struct_ppfmt_lines()` (which can also be
called directly) to iter-render field-lines-pairs; it's now called from
the top level `.pformat()`. The use case is to allow more granular
control for rendering runtime primitive (like `Actor.pformat()`) reprs
depending on log-level/config, oh and using `textwrap` for indenting.
2025-06-22 22:09:37 -04:00
Tyler Goodlet c559f80f08 Extend `.msg.types.Aid` method interface
Providing the legacy `.uid -> tuple` style id (since still used for the
`Actor._contexts` table) and a `repr-one-line` method `.reprol() -> str`
for rendering a compact unique actor ID summary (useful in
logging/.pformat()s at the least).
2025-06-22 21:55:37 -04:00
Tyler Goodlet 6a6f55cee0 Mv in `modden.repr` content, refine `nest_from_op()`
Since I'd like to use some `reprlib` formatting which `modden` already
implemented (and it's a main dependee project), figured I'd just bring
it all into `.devx.pformat` for now.

Also some more tweaks to `nest_from_op()` namely for correctness and
some additional paarams,
- an explicit `op_suffix: str = '\n'` (instead of always assuming
  `f'{input_op}\n'`).
- add `prefix_op: bool` so that, when unset, the `input_op` is instead
  used as a suffix to the first line of `text`.
- default `next_indent = None` such that when set (and not null) we use
  that exact ws-indent instead of calculating it from the
  `len(nest_prefix)` allowing for specifying a `0`-indent easily.
2025-06-22 20:57:51 -04:00
Tyler Goodlet bff32b0ad7 Drop 'IPC' prefix from `._server` types
We already have the `.ipc` sub-pkg name so it seems a bit
redundant/noisy for a namespace path Bp

Leave an alias for the `Server` rn since it's already used in a few
other internal mods.. will likely rename later if everyone is cool with
it..
2025-06-17 23:33:58 -04:00
7 changed files with 394 additions and 146 deletions

View File

@ -49,7 +49,7 @@ def test_basic_ipc_server(
) )
assert server._no_more_peers.is_set() assert server._no_more_peers.is_set()
eps: list[ipc.IPCEndpoint] = await server.listen_on( eps: list[ipc._server.Endpoint] = await server.listen_on(
accept_addrs=[rando_addr], accept_addrs=[rando_addr],
stream_handler_nursery=None, stream_handler_nursery=None,
) )

View File

@ -234,7 +234,7 @@ class Actor:
# state # state
self._cancel_complete = trio.Event() self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple]|None = None self._cancel_called_by: tuple[str, tuple]|None = None
self._cancel_called: bool = False self._cancel_called: bool = False
# retreive and store parent `__main__` data which # retreive and store parent `__main__` data which
@ -346,69 +346,118 @@ class Actor:
def pid(self) -> int: def pid(self) -> int:
return self._aid.pid return self._aid.pid
@property
def repr_state(self) -> str:
if self.cancel_complete:
return 'cancelled'
elif canceller := self.cancel_caller:
return f' and cancel-called by {canceller}'
else:
return 'running'
def pformat( def pformat(
self, self,
ds: str = ': ', ds: str = ': ',
indent: int = 0, indent: int = 0,
privates: bool = False,
) -> str: ) -> str:
fields_sect_prefix: str = ' |_'
parent_uid: tuple|None = None fmtstr: str = f'|_id: {self.aid.reprol()!r}\n'
if privates:
aid_nest_prefix: str = '|_aid='
aid_field_repr: str = _pformat.nest_from_op(
input_op='',
text=pretty_struct.pformat(
struct=self.aid,
field_indent=2,
),
op_suffix='',
nest_prefix=aid_nest_prefix,
nest_indent=0,
)
fmtstr: str = f'{aid_field_repr}'
if rent_chan := self._parent_chan: if rent_chan := self._parent_chan:
parent_uid = rent_chan.uid fmtstr += (
f"|_parent{ds}{rent_chan.aid.reprol()}\n"
)
peers: list = []
server: _server.IPCServer = self.ipc_server server: _server.IPCServer = self.ipc_server
ipc_server_sect: str = ''
if server: if server:
peers: list[tuple] = list(server._peer_connected) if privates:
server_repr: str = self._ipc_server.pformat(
privates=privates,
)
# create field ln as a key-header indented under
# and up to the section's key prefix.
# ^XXX if we were to indent `repr(Server)` to
# '<key>: '
# _here_^
server_repr: str = _pformat.nest_from_op(
input_op='', # nest as sub-obj
op_suffix='',
text=server_repr,
)
fmtstr += (
f"{server_repr}"
)
else:
fmtstr += (
f'|_ipc: {server.repr_state!r}\n'
)
# create field ln as a key-header indented under fmtstr += (
# and up to the section's key prefix. f'|_rpc: {len(self._rpc_tasks)} active tasks\n'
# field_ln_header: str = textwrap.indent(
# text=f"ipc_server{ds}",
# prefix=' '*len(fields_sect_prefix),
# )
# ^XXX if we were to indent `repr(Server)` to
# '<key>: '
# _here_^
server_repr: str = textwrap.indent(
text=self._ipc_server.pformat(),
# prefix=' '*len(field_ln_header),
prefix=' '*len(fields_sect_prefix),
)
ipc_server_sect: str = (
# f'{field_ln_header}\n'
f'{server_repr}'
)
fmtstr: str = (
f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n"
f" parent{ds}{parent_uid}\n"
# f'\n'
f' |_ipc: {len(peers)!r} connected peers\n'
f" peers{ds}{peers!r}\n"
f"{ipc_server_sect}"
# f'\n'
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
f" ctxs{ds}{len(self._contexts)}\n"
# f'\n'
f' |_runtime: ._task{ds}{self._task!r}\n'
f' _spawn_method{ds}{self._spawn_method}\n'
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' _forkserver_info{ds}{self._forkserver_info}\n'
# f'\n'
f' |_state: "TODO: .repr_state()"\n'
f' _cancel_complete{ds}{self._cancel_complete}\n'
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
f' _cancel_called{ds}{self._cancel_called}\n'
) )
# TODO, actually fix the .repr_state impl/output?
# append ipc-ctx state summary
# ctxs: dict = self._contexts
# if ctxs:
# ctx_states: dict[str, int] = {}
# for ctx in self._contexts.values():
# ctx_state: str = ctx.repr_state
# cnt = ctx_states.setdefault(ctx_state, 0)
# ctx_states[ctx_state] = cnt + 1
# fmtstr += (
# f" ctxs{ds}{ctx_states}\n"
# )
# runtime-state
task_name: str = '<dne>'
if task := self._task:
task_name: str = task.name
fmtstr += (
# TODO, this just like ctx?
f'|_state: {self.repr_state!r}\n'
f' task: {task_name}\n'
f' loglevel: {self.loglevel!r}\n'
f' subactors_spawned: {len(self._actoruid2nursery)}\n'
)
if not _state.is_root_process():
fmtstr += f' spawn_method: {self._spawn_method!r}\n'
if privates:
fmtstr += (
# f' actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' cancel_complete{ds}{self._cancel_complete}\n'
f' cancel_called_by_remote{ds}{self._cancel_called_by}\n'
f' cancel_called{ds}{self._cancel_called}\n'
)
if fmtstr:
fmtstr: str = textwrap.indent(
text=fmtstr,
prefix=' '*(1 + indent),
)
_repr: str = ( _repr: str = (
'<Actor(\n' f'<{type(self).__name__}(\n'
+ f'{fmtstr}'
fmtstr f')>\n'
+
')>\n'
) )
if indent: if indent:
_repr: str = textwrap.indent( _repr: str = textwrap.indent(
@ -533,11 +582,11 @@ class Actor:
queue. queue.
''' '''
uid: tuple[str, str] = chan.uid aid: msgtypes.Aid = chan.aid
assert uid, f"`chan.uid` can't be {uid}" assert aid, f"`chan.aid` can't be {aid}"
try: try:
ctx: Context = self._contexts[( ctx: Context = self._contexts[(
uid, aid.uid,
cid, cid,
# TODO: how to determine this tho? # TODO: how to determine this tho?
@ -548,7 +597,7 @@ class Actor:
'Ignoring invalid IPC msg!?\n' 'Ignoring invalid IPC msg!?\n'
f'Ctx seems to not/no-longer exist??\n' f'Ctx seems to not/no-longer exist??\n'
f'\n' f'\n'
f'<=? {uid}\n' f'<=? {aid.reprol()!r}\n'
f' |_{pretty_struct.pformat(msg)}\n' f' |_{pretty_struct.pformat(msg)}\n'
) )
match msg: match msg:
@ -597,6 +646,7 @@ class Actor:
msging session's lifetime. msging session's lifetime.
''' '''
# ?TODO, use Aid here as well?
actor_uid = chan.uid actor_uid = chan.uid
assert actor_uid assert actor_uid
try: try:
@ -945,6 +995,22 @@ class Actor:
None, # self cancel all rpc tasks None, # self cancel all rpc tasks
) )
@property
def cancel_complete(self) -> bool:
return self._cancel_complete.is_set()
@property
def cancel_called(self) -> bool:
'''
Was this actor requested to cancel by a remote peer actor.
'''
return self._cancel_called_by is not None
@property
def cancel_caller(self) -> msgtypes.Aid|None:
return self._cancel_called_by
async def cancel( async def cancel(
self, self,
@ -969,20 +1035,18 @@ class Actor:
''' '''
( (
requesting_uid, requesting_aid, # Aid
requester_type, requester_type, # str
req_chan, req_chan,
log_meth, log_meth,
) = ( ) = (
req_chan.uid, req_chan.aid,
'peer', 'peer',
req_chan, req_chan,
log.cancel, log.cancel,
) if req_chan else ( ) if req_chan else (
# a self cancel of ALL rpc tasks # a self cancel of ALL rpc tasks
self.uid, self.aid,
'self', 'self',
self, self,
log.runtime, log.runtime,
@ -990,14 +1054,14 @@ class Actor:
# TODO: just use the new `Context.repr_rpc: str` (and # TODO: just use the new `Context.repr_rpc: str` (and
# other) repr fields instead of doing this all manual.. # other) repr fields instead of doing this all manual..
msg: str = ( msg: str = (
f'Actor-runtime cancel request from {requester_type}\n\n' f'Actor-runtime cancel request from {requester_type!r}\n'
f'<=c) {requesting_uid}\n'
f' |_{self}\n'
f'\n' f'\n'
f'<=c)\n'
f'{self}'
) )
# TODO: what happens here when we self-cancel tho? # TODO: what happens here when we self-cancel tho?
self._cancel_called_by_remote: tuple = requesting_uid self._cancel_called_by: tuple = requesting_aid
self._cancel_called = True self._cancel_called = True
# cancel all ongoing rpc tasks # cancel all ongoing rpc tasks
@ -1025,7 +1089,7 @@ class Actor:
# self-cancel **all** ongoing RPC tasks # self-cancel **all** ongoing RPC tasks
await self.cancel_rpc_tasks( await self.cancel_rpc_tasks(
req_uid=requesting_uid, req_aid=requesting_aid,
parent_chan=None, parent_chan=None,
) )
@ -1054,8 +1118,7 @@ class Actor:
self, self,
cid: str, cid: str,
parent_chan: Channel, parent_chan: Channel,
requesting_uid: tuple[str, str]|None, requesting_aid: msgtypes.Aid|None,
# ^^TODO! use the `Aid` directly here!
ipc_msg: dict|None|bool = False, ipc_msg: dict|None|bool = False,
@ -1093,7 +1156,7 @@ class Actor:
log.runtime( log.runtime(
'Cancel request for invalid RPC task.\n' 'Cancel request for invalid RPC task.\n'
'The task likely already completed or was never started!\n\n' 'The task likely already completed or was never started!\n\n'
f'<= canceller: {requesting_uid}\n' f'<= canceller: {requesting_aid}\n'
f'=> {cid}@{parent_chan.uid}\n' f'=> {cid}@{parent_chan.uid}\n'
f' |_{parent_chan}\n' f' |_{parent_chan}\n'
) )
@ -1101,7 +1164,7 @@ class Actor:
log.cancel( log.cancel(
'Rxed cancel request for RPC task\n' 'Rxed cancel request for RPC task\n'
f'{ctx._task!r} <=c) {requesting_uid}\n' f'{ctx._task!r} <=c) {requesting_aid}\n'
f'|_>> {ctx.repr_rpc}\n' f'|_>> {ctx.repr_rpc}\n'
# f'|_{ctx._task}\n' # f'|_{ctx._task}\n'
@ -1127,9 +1190,9 @@ class Actor:
) )
if ( if (
ctx._canceller is None ctx._canceller is None
and requesting_uid and requesting_aid
): ):
ctx._canceller: tuple = requesting_uid ctx._canceller: tuple = requesting_aid.uid
# TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and # TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and
# then raise and pack it here? # then raise and pack it here?
@ -1155,7 +1218,7 @@ class Actor:
# wait for _invoke to mark the task complete # wait for _invoke to mark the task complete
flow_info: str = ( flow_info: str = (
f'<= canceller: {requesting_uid}\n' f'<= canceller: {requesting_aid}\n'
f'=> ipc-parent: {parent_chan}\n' f'=> ipc-parent: {parent_chan}\n'
f'|_{ctx}\n' f'|_{ctx}\n'
) )
@ -1172,7 +1235,7 @@ class Actor:
async def cancel_rpc_tasks( async def cancel_rpc_tasks(
self, self,
req_uid: tuple[str, str], req_aid: msgtypes.Aid,
# NOTE: when None is passed we cancel **all** rpc # NOTE: when None is passed we cancel **all** rpc
# tasks running in this actor! # tasks running in this actor!
@ -1189,7 +1252,7 @@ class Actor:
if not tasks: if not tasks:
log.runtime( log.runtime(
'Actor has no cancellable RPC tasks?\n' 'Actor has no cancellable RPC tasks?\n'
f'<= canceller: {req_uid}\n' f'<= canceller: {req_aid.reprol()}\n'
) )
return return
@ -1229,7 +1292,7 @@ class Actor:
) )
log.cancel( log.cancel(
f'Cancelling {descr} RPC tasks\n\n' f'Cancelling {descr} RPC tasks\n\n'
f'<=c) {req_uid} [canceller]\n' f'<=c) {req_aid} [canceller]\n'
f'{rent_chan_repr}' f'{rent_chan_repr}'
f'c)=> {self.uid} [cancellee]\n' f'c)=> {self.uid} [cancellee]\n'
f' |_{self} [with {len(tasks)} tasks]\n' f' |_{self} [with {len(tasks)} tasks]\n'
@ -1257,7 +1320,7 @@ class Actor:
await self._cancel_task( await self._cancel_task(
cid, cid,
task_caller_chan, task_caller_chan,
requesting_uid=req_uid, requesting_aid=req_aid,
) )
if tasks: if tasks:
@ -1554,8 +1617,9 @@ async def async_main(
# 'Blocking on service nursery to exit..\n' # 'Blocking on service nursery to exit..\n'
) )
log.runtime( log.runtime(
"Service nursery complete\n" 'Service nursery complete\n'
"Waiting on root nursery to complete" '\n'
'-> Waiting on root nursery to complete'
) )
# Blocks here as expected until the root nursery is # Blocks here as expected until the root nursery is
@ -1705,7 +1769,7 @@ async def async_main(
) )
teardown_report += ( teardown_report += (
'Actor runtime exited\n' 'Actor runtime exited\n'
f'{op_nested_actor_repr}\n' f'{op_nested_actor_repr}'
) )
log.info(teardown_report) log.info(teardown_report)

View File

@ -15,8 +15,10 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Pretty formatters for use throughout the code base. Pretty formatters for use throughout our internals.
Mostly handy for logging and exception message content.
Handy for logging and exception message content but also for `repr()`
in REPL(s).
''' '''
import sys import sys
@ -224,8 +226,8 @@ def pformat_cs(
field_prefix: str = ' |_', field_prefix: str = ' |_',
) -> str: ) -> str:
''' '''
Pretty format info about a `trio.CancelScope` including most Pretty format info about a `trio.CancelScope` including most of
of its public state and `._cancel_status`. its public state and `._cancel_status`.
The output can be modified to show a "var name" for the The output can be modified to show a "var name" for the
instance as a field prefix, just a simple str before each instance as a field prefix, just a simple str before each
@ -252,9 +254,12 @@ def pformat_cs(
def nest_from_op( def nest_from_op(
input_op: str, # TODO, Literal of all op-"symbols" from below? input_op: str, # TODO, Literal of all op-"symbols" from below?
text: str, text: str,
prefix_op: bool = True, # unset is to suffix the first line
# optionally suffix `text`, by def on a newline
op_suffix='\n',
nest_prefix: str = '|_', nest_prefix: str = '|_',
nest_indent: int = 0, nest_indent: int|None = None,
# XXX indent `next_prefix` "to-the-right-of" `input_op` # XXX indent `next_prefix` "to-the-right-of" `input_op`
# by this count of whitespaces (' '). # by this count of whitespaces (' ').
@ -348,11 +353,15 @@ def nest_from_op(
prefix=nest_indent*' ', prefix=nest_indent*' ',
) )
tree_str_indent: int = len(nest_prefix) indented_tree_str: str = text
indented_tree_str: str = textwrap.indent( tree_str_indent: int = 0
text, if nest_indent != 0:
prefix=' '*tree_str_indent tree_str_indent: int = len(nest_prefix)
) indented_tree_str: str = textwrap.indent(
text,
prefix=' '*tree_str_indent
)
# inject any provided nesting-prefix chars # inject any provided nesting-prefix chars
# into the head of the first line. # into the head of the first line.
if nest_prefix: if nest_prefix:
@ -360,7 +369,126 @@ def nest_from_op(
f'{nest_prefix}{indented_tree_str[tree_str_indent:]}' f'{nest_prefix}{indented_tree_str[tree_str_indent:]}'
) )
return ( if prefix_op:
f'{input_op}\n' return (
f'{indented_tree_str}' f'{input_op}{op_suffix}'
f'{indented_tree_str}'
)
else:
tree_lns: list[str] = indented_tree_str.splitlines()
first: str = tree_lns[0]
rest: str = '\n'.join(tree_lns[1:])
return (
f'{first}{input_op}{op_suffix}'
f'{rest}'
)
# ------ modden.repr ------
# XXX originally taken verbaatim from `modden.repr`
'''
More "multi-line" representation then the stdlib's `pprint` equivs.
'''
from inspect import (
FrameInfo,
stack,
)
import pprint
import reprlib
from typing import (
Callable,
)
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=3, # indent used for repr of recursive objects
maxlevel=616, # recursion levels
maxdict=616, # max items shown for `dict`
maxlist=616, # max items shown for `dict`
maxstring=616, # match editor line-len limit
maxtuple=616, # match editor line-len limit
maxother=616, # match editor line-len limit
) )
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr
def ppfmt(
obj: object,
do_print: bool = False,
) -> str:
'''
The `pprint.pformat()` version of `pprint.pp()`, namely
a default `sort_dicts=False`.. (which i think should be
the normal default in the stdlib).
'''
pprepr: Callable = mk_repr()
repr_str: str = pprepr(obj)
if do_print:
return pprint.pp(repr_str)
return repr_str
pformat = ppfmt
def pfmt_frame_info(fi: FrameInfo) -> str:
'''
Like a std `inspect.FrameInfo.__repr__()` but multi-line..
'''
return (
'FrameInfo(\n'
' frame={!r},\n'
' filename={!r},\n'
' lineno={!r},\n'
' function={!r},\n'
' code_context={!r},\n'
' index={!r},\n'
' positions={!r})'
).format(
fi.frame,
fi.filename,
fi.lineno,
fi.function,
fi.code_context,
fi.index,
fi.positions
)
def pfmt_callstack(frames: int = 1) -> str:
'''
Generate a string of nested `inspect.FrameInfo` objects returned
from a `inspect.stack()` call such that only the `.frame` field
for each layer is pprinted.
'''
caller_frames: list[FrameInfo] = stack()[1:1+frames]
frames_str: str = ''
for i, frame_info in enumerate(caller_frames):
frames_str += textwrap.indent(
f'{frame_info.frame!r}\n',
prefix=' '*i,
)
return frames_str

View File

@ -196,9 +196,12 @@ class Channel:
self._transport.codec = orig self._transport.codec = orig
# TODO: do a .src/.dst: str for maddrs? # TODO: do a .src/.dst: str for maddrs?
def pformat(self) -> str: def pformat(
self,
privates: bool = False,
) -> str:
if not self._transport: if not self._transport:
return '<Channel with inactive transport?>' return '<Channel( with inactive transport? )>'
tpt: MsgTransport = self._transport tpt: MsgTransport = self._transport
tpt_name: str = type(tpt).__name__ tpt_name: str = type(tpt).__name__
@ -206,14 +209,17 @@ class Channel:
'connected' if self.connected() 'connected' if self.connected()
else 'closed' else 'closed'
) )
return ( repr_str: str = (
f'<Channel(\n' f'<Channel(\n'
f' |_status: {tpt_status!r}\n' f' |_status: {tpt_status!r}\n'
) + (
f' _closed={self._closed}\n' f' _closed={self._closed}\n'
f' _cancel_called={self._cancel_called}\n' f' _cancel_called={self._cancel_called}\n'
f'\n' if privates else ''
f' |_peer: {self.aid}\n' ) + ( # peer-actor (processs) section
f'\n' f' |_peer: {self.aid.reprol()!r}\n'
if self.aid else '<unknown>'
) + (
f' |_msgstream: {tpt_name}\n' f' |_msgstream: {tpt_name}\n'
f' proto={tpt.laddr.proto_key!r}\n' f' proto={tpt.laddr.proto_key!r}\n'
f' layer={tpt.layer_key!r}\n' f' layer={tpt.layer_key!r}\n'
@ -223,9 +229,13 @@ class Channel:
f' stream={tpt.stream}\n' f' stream={tpt.stream}\n'
f' maddr={tpt.maddr!r}\n' f' maddr={tpt.maddr!r}\n'
f' drained={tpt.drained}\n' f' drained={tpt.drained}\n'
) + (
f' _send_lock={tpt._send_lock.statistics()}\n' f' _send_lock={tpt._send_lock.statistics()}\n'
f')>\n' if privates else ''
) + (
')>\n'
) )
return repr_str
# NOTE: making this return a value that can be passed to # NOTE: making this return a value that can be passed to
# `eval()` is entirely **optional** FYI! # `eval()` is entirely **optional** FYI!

View File

@ -289,7 +289,7 @@ async def maybe_wait_on_canced_subs(
# #
# -[x] maybe change to mod-func and rename for implied # -[x] maybe change to mod-func and rename for implied
# multi-transport semantics? # multi-transport semantics?
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint` # -[ ] register each stream/tpt/chan with the owning `Endpoint`
# so that we can query per tpt all peer contact infos? # so that we can query per tpt all peer contact infos?
# |_[ ] possibly provide a global viewing via a # |_[ ] possibly provide a global viewing via a
# `collections.ChainMap`? # `collections.ChainMap`?
@ -309,7 +309,7 @@ async def handle_stream_from_peer(
any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery` any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery`
such that it is invoked as, such that it is invoked as,
IPCEndpoint.stream_handler_tn.start_soon( Endpoint.stream_handler_tn.start_soon(
handle_stream, handle_stream,
stream, stream,
) )
@ -577,7 +577,7 @@ async def handle_stream_from_peer(
# finally block closure # finally block closure
class IPCEndpoint(Struct): class Endpoint(Struct):
''' '''
An instance of an IPC "bound" address where the lifetime of the An instance of an IPC "bound" address where the lifetime of the
"ability to accept connections" (from clients) and then handle "ability to accept connections" (from clients) and then handle
@ -636,7 +636,7 @@ class IPCEndpoint(Struct):
) )
class IPCServer(Struct): class Server(Struct):
_parent_tn: Nursery _parent_tn: Nursery
_stream_handler_tn: Nursery _stream_handler_tn: Nursery
# level-triggered sig for whether "no peers are currently # level-triggered sig for whether "no peers are currently
@ -644,7 +644,7 @@ class IPCServer(Struct):
# initialized with `.is_set() == True`. # initialized with `.is_set() == True`.
_no_more_peers: trio.Event _no_more_peers: trio.Event
_endpoints: list[IPCEndpoint] = [] _endpoints: list[Endpoint] = []
# connection tracking & mgmt # connection tracking & mgmt
_peers: defaultdict[ _peers: defaultdict[
@ -659,10 +659,10 @@ class IPCServer(Struct):
# syncs for setup/teardown sequences # syncs for setup/teardown sequences
_shutdown: trio.Event|None = None _shutdown: trio.Event|None = None
# TODO, maybe just make `._endpoints: list[IPCEndpoint]` and # TODO, maybe just make `._endpoints: list[Endpoint]` and
# provide dict-views onto it? # provide dict-views onto it?
# @property # @property
# def addrs2eps(self) -> dict[Address, IPCEndpoint]: # def addrs2eps(self) -> dict[Address, Endpoint]:
# ... # ...
@property @property
@ -708,7 +708,7 @@ class IPCServer(Struct):
await self._shutdown.wait() await self._shutdown.wait()
else: else:
tpt_protos: list[str] = [] tpt_protos: list[str] = []
ep: IPCEndpoint ep: Endpoint
for ep in self._endpoints: for ep in self._endpoints:
tpt_protos.append(ep.addr.proto_key) tpt_protos.append(ep.addr.proto_key)
@ -790,7 +790,7 @@ class IPCServer(Struct):
def epsdict(self) -> dict[ def epsdict(self) -> dict[
Address, Address,
IPCEndpoint, Endpoint,
]: ]:
return { return {
ep.addr: ep ep.addr: ep
@ -804,7 +804,7 @@ class IPCServer(Struct):
return ev.is_set() return ev.is_set()
def pformat(self) -> str: def pformat(self) -> str:
eps: list[IPCEndpoint] = self._endpoints eps: list[Endpoint] = self._endpoints
state_repr: str = ( state_repr: str = (
f'{len(eps)!r} IPC-endpoints active' f'{len(eps)!r} IPC-endpoints active'
@ -835,13 +835,13 @@ class IPCServer(Struct):
# TODO? maybe allow shutting down a `.listen_on()`s worth of # TODO? maybe allow shutting down a `.listen_on()`s worth of
# listeners by cancelling the corresponding # listeners by cancelling the corresponding
# `IPCEndpoint._listen_tn` only ? # `Endpoint._listen_tn` only ?
# -[ ] in theory you could use this to # -[ ] in theory you could use this to
# "boot-and-wait-for-reconnect" of all current and connecting # "boot-and-wait-for-reconnect" of all current and connecting
# peers? # peers?
# |_ would require that the stream-handler is intercepted so we # |_ would require that the stream-handler is intercepted so we
# can intercept every `MsgTransport` (stream) and track per # can intercept every `MsgTransport` (stream) and track per
# `IPCEndpoint` likely? # `Endpoint` likely?
# #
# async def unlisten( # async def unlisten(
# self, # self,
@ -854,7 +854,7 @@ class IPCServer(Struct):
*, *,
accept_addrs: list[tuple[str, int|str]]|None = None, accept_addrs: list[tuple[str, int|str]]|None = None,
stream_handler_nursery: Nursery|None = None, stream_handler_nursery: Nursery|None = None,
) -> list[IPCEndpoint]: ) -> list[Endpoint]:
''' '''
Start `SocketListeners` (i.e. bind and call `socket.listen()`) Start `SocketListeners` (i.e. bind and call `socket.listen()`)
for all IPC-transport-protocol specific `Address`-types for all IPC-transport-protocol specific `Address`-types
@ -888,7 +888,7 @@ class IPCServer(Struct):
f'Binding to endpoints for,\n' f'Binding to endpoints for,\n'
f'{accept_addrs}\n' f'{accept_addrs}\n'
) )
eps: list[IPCEndpoint] = await self._parent_tn.start( eps: list[Endpoint] = await self._parent_tn.start(
partial( partial(
_serve_ipc_eps, _serve_ipc_eps,
server=self, server=self,
@ -904,7 +904,7 @@ class IPCServer(Struct):
self._endpoints.extend(eps) self._endpoints.extend(eps)
# XXX, just a little bit of sanity # XXX, just a little bit of sanity
group_tn: Nursery|None = None group_tn: Nursery|None = None
ep: IPCEndpoint ep: Endpoint
for ep in eps: for ep in eps:
if ep.addr not in self.addrs: if ep.addr not in self.addrs:
breakpoint() breakpoint()
@ -917,6 +917,10 @@ class IPCServer(Struct):
return eps return eps
# alias until we decide on final naming
IPCServer = Server
async def _serve_ipc_eps( async def _serve_ipc_eps(
*, *,
server: IPCServer, server: IPCServer,
@ -941,12 +945,12 @@ async def _serve_ipc_eps(
listen_tn: Nursery listen_tn: Nursery
async with trio.open_nursery() as listen_tn: async with trio.open_nursery() as listen_tn:
eps: list[IPCEndpoint] = [] eps: list[Endpoint] = []
# XXX NOTE, required to call `serve_listeners()` below. # XXX NOTE, required to call `serve_listeners()` below.
# ?TODO, maybe just pass `list(eps.values()` tho? # ?TODO, maybe just pass `list(eps.values()` tho?
listeners: list[trio.abc.Listener] = [] listeners: list[trio.abc.Listener] = []
for addr in listen_addrs: for addr in listen_addrs:
ep = IPCEndpoint( ep = Endpoint(
addr=addr, addr=addr,
listen_tn=listen_tn, listen_tn=listen_tn,
stream_handler_tn=stream_handler_tn, stream_handler_tn=stream_handler_tn,
@ -1010,7 +1014,7 @@ async def _serve_ipc_eps(
finally: finally:
if eps: if eps:
addr: Address addr: Address
ep: IPCEndpoint ep: Endpoint
for addr, ep in server.epsdict().items(): for addr, ep in server.epsdict().items():
ep.close_listener() ep.close_listener()
server._endpoints.remove(ep) server._endpoints.remove(ep)

View File

@ -20,6 +20,7 @@ Prettified version of `msgspec.Struct` for easier console grokin.
''' '''
from __future__ import annotations from __future__ import annotations
from collections import UserList from collections import UserList
import textwrap
from typing import ( from typing import (
Any, Any,
Iterator, Iterator,
@ -105,27 +106,11 @@ def iter_fields(struct: Struct) -> Iterator[
) )
def pformat( def iter_struct_ppfmt_lines(
struct: Struct, struct: Struct,
field_indent: int = 2, field_indent: int = 0,
indent: int = 0, ) -> Iterator[tuple[str, str]]:
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + struct.__class__.__qualname__
qtn: str = struct.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo fi: structs.FieldInfo
k: str k: str
v: Any v: Any
@ -135,15 +120,18 @@ def pformat(
# ..]` over .__name__ == `Literal` but still get only the # ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..? # latter for simple types like `str | int | None` etc..?
ft: type = fi.type ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft)) typ_name: str = getattr(
ft,
'__name__',
str(ft)
).replace(' ', '')
# recurse to get sub-struct's `.pformat()` output Bo # recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct): if isinstance(v, Struct):
val_str: str = v.pformat( yield from iter_struct_ppfmt_lines(
indent=field_indent + indent, struct=v,
field_indent=indent + field_indent, field_indent=field_indent+field_indent,
) )
else: else:
val_str: str = repr(v) val_str: str = repr(v)
@ -161,8 +149,39 @@ def pformat(
# raise # raise
# return _Struct.__repr__(struct) # return _Struct.__repr__(struct)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! yield (
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') ' '*field_indent, # indented ws prefix
f'{k}: {typ_name} = {val_str},', # field's repr line content
)
def pformat(
struct: Struct,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
obj_str: str = '' # accumulator
for prefix, field_repr, in iter_struct_ppfmt_lines(
struct,
field_indent=field_indent,
):
obj_str += f'{prefix}{field_repr}\n'
# global whitespace indent
ws: str = ' '*indent
if indent:
obj_str: str = textwrap.indent(
text=obj_str,
prefix=ws,
)
# qtn: str = ws + struct.__class__.__qualname__
qtn: str = struct.__class__.__qualname__
return ( return (
f'{qtn}(\n' f'{qtn}(\n'

View File

@ -154,6 +154,29 @@ class Aid(
# should also include at least `.pid` (equiv to port for tcp) # should also include at least `.pid` (equiv to port for tcp)
# and/or host-part always? # and/or host-part always?
@property
def uid(self) -> tuple[str, str]:
'''
Legacy actor "unique-id" pair format.
'''
return (
self.name,
self.uuid,
)
def reprol(
self,
sin_uuid: bool = True,
) -> str:
if not sin_uuid:
return (
f'{self.name}[{self.uuid[:6]}]@{self.pid!r}'
)
return (
f'{self.name}@{self.pid!r}'
)
class SpawnSpec( class SpawnSpec(
pretty_struct.Struct, pretty_struct.Struct,