Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet c0058024c2 Add todo for py3.13+ `.shared_memory`'s new `track=False` support.. finally they added it XD 2025-07-04 00:08:52 -04:00
Tyler Goodlet 065104401c Even more `.ipc.*` repr refinements
Mostly adjusting indentation, noise level, and clarity via `.pformat()`
tweaks more general use of `.devx.pformat.nest_from_op()`.

Specific impl deats,
- use `pformat.ppfmt()/`nest_from_op()` more seriously throughout
  `._server`.
- add a `._server.Endpoint.pformat()`.
- add `._server.Server.len_peers()` and `.repr_state()`.
- polish `Server.pformat()`.
- drop some redundant `log.runtime()`s from `._serve_ipc_eps()` instead
  leaving-them-only/putting-them in the caller pub meth.
- `._tcp.start_listener()` log the bound addr, not the input (which may
  be the 0-port.
2025-07-03 23:33:02 -04:00
Tyler Goodlet 3201437f4e More `.ipc.Channel`-repr related tweaks
- only generate a repr in `.from_addr()` when log level is >= 'runtime'.
 |_ add a todo about supporting this optimization more generally on our
   adapter.
- fix `Channel.pformat()` to show unknown peer field line fmt correctly.
- add a `Channel.maddr: str` which just delegates directly to the
  `._transport` like other pass-thru property fields.
2025-06-29 15:47:42 -04:00
Tyler Goodlet a9da16892d Mk `Aid` hashable, use pretty-`.__repr__()`
Hash on the `.uuid: str` and delegate verbatim to
`msg.pretty_struct.Struct`'s equiv method.
2025-06-29 15:39:09 -04:00
Tyler Goodlet 1b609113c3 .trionics: link in `finally`-footgun `trio` GH ish 2025-06-29 15:34:10 -04:00
Tyler Goodlet 4a80cda841 .log: expose `at_least_level()` as `StackLevelAdapter` meth 2025-06-29 15:33:31 -04:00
Tyler Goodlet 131e2ee0a4 Drop `actor_info: str` from `._entry` logs 2025-06-29 14:59:50 -04:00
Tyler Goodlet 79ef973058 Try `nest_from_op()` in some `._rpc` spots
To start trying out,
- using in the `Start`-msg handler-block to repr the msg coming
  *from* a `repr(Channel)` using '<=)` sclang op.
- for a completed RPC task in `_invoke_non_context()`.
- for the msg loop task's termination report.
2025-06-29 14:47:03 -04:00
Tyler Goodlet c738492879 Hide more `Channel._transport` privates for repr
Such as the `MsgTransport.stream` and `.drain` attrs since they're
rarely that important at the chan level. Also start adopting
a `.<attr>=` style for actual attrs of the type versus a `<name>:
` style for meta-field info lines.
2025-06-29 13:41:10 -04:00
Tyler Goodlet a931274da6 Moar `nest_from_op()` tweaks..
- better `nest_indent` logic where we either get a non-zero value and
  apply it strictly to both the `nest_prefix` and `text`, OR we
  auto-calc it from any `nest_prefix`, NOT a conflation of both..
- add a `rm_from_first_ln: str` which allows removing chars from the
  first line of `text` after a `str.strip()` (handy for removing
  the '<Channel' first chevron from type-reprs).
2025-06-29 13:37:32 -04:00
10 changed files with 394 additions and 178 deletions

View File

@ -21,7 +21,7 @@ Sub-process entry points.
from __future__ import annotations
from functools import partial
import multiprocessing as mp
import os
# import os
from typing import (
Any,
TYPE_CHECKING,
@ -38,6 +38,7 @@ from .devx import (
_frame_stack,
pformat,
)
# from .msg import pretty_struct
from .to_asyncio import run_as_asyncio_guest
from ._addr import UnwrappedAddress
from ._runtime import (
@ -127,20 +128,13 @@ def _trio_main(
if actor.loglevel is not None:
get_console_log(actor.loglevel)
actor_info: str = (
f'|_{actor}\n'
f' uid: {actor.uid}\n'
f' pid: {os.getpid()}\n'
f' parent_addr: {parent_addr}\n'
f' loglevel: {actor.loglevel}\n'
)
log.info(
'Starting new `trio` subactor\n'
f'Starting `trio` subactor from parent @ '
f'{parent_addr}\n'
+
pformat.nest_from_op(
input_op='>(', # see syntax ideas above
text=actor_info,
nest_indent=2, # since "complete"
text=f'{actor}',
)
)
logmeth = log.info
@ -149,7 +143,7 @@ def _trio_main(
+
pformat.nest_from_op(
input_op=')>', # like a "closed-to-play"-icon from super perspective
text=actor_info,
text=f'{actor}',
nest_indent=1,
)
)
@ -167,7 +161,7 @@ def _trio_main(
+
pformat.nest_from_op(
input_op='c)>', # closed due to cancel (see above)
text=actor_info,
text=f'{actor}',
)
)
except BaseException as err:
@ -177,7 +171,7 @@ def _trio_main(
+
pformat.nest_from_op(
input_op='x)>', # closed by error
text=actor_info,
text=f'{actor}',
)
)
# NOTE since we raise a tb will already be shown on the

View File

@ -64,6 +64,7 @@ from .trionics import (
from .devx import (
debug,
add_div,
pformat as _pformat,
)
from . import _state
from .log import get_logger
@ -72,7 +73,7 @@ from .msg import (
MsgCodec,
PayloadT,
NamespacePath,
# pretty_struct,
pretty_struct,
_ops as msgops,
)
from tractor.msg.types import (
@ -220,11 +221,18 @@ async def _invoke_non_context(
task_status.started(ctx)
result = await coro
fname: str = func.__name__
op_nested_task: str = _pformat.nest_from_op(
input_op=f')> cid: {ctx.cid!r}',
text=f'{ctx._task}',
nest_indent=1, # under >
)
log.runtime(
'RPC complete:\n'
f'task: {ctx._task}\n'
f'|_cid={ctx.cid}\n'
f'|_{fname}() -> {pformat(result)}\n'
f'RPC task complete\n'
f'\n'
f'{op_nested_task}\n'
f'\n'
f')> {fname}() -> {pformat(result)}\n'
)
# NOTE: only send result if we know IPC isn't down
@ -1043,7 +1051,7 @@ async def process_messages(
):
target_cid: str = kwargs['cid']
kwargs |= {
'requesting_uid': chan.uid,
'requesting_aid': chan.aid,
'ipc_msg': msg,
# XXX NOTE! ONLY the rpc-task-owning
@ -1079,21 +1087,34 @@ async def process_messages(
ns=ns,
func=funcname,
kwargs=kwargs, # type-spec this? see `msg.types`
uid=actorid,
uid=actor_uuid,
):
if actor_uuid != chan.aid.uid:
raise RuntimeError(
f'IPC <Start> msg <-> chan.aid mismatch!?\n'
f'Channel.aid = {chan.aid!r}\n'
f'Start.uid = {actor_uuid!r}\n'
)
# await debug.pause()
op_repr: str = 'Start <=) '
req_repr: str = _pformat.nest_from_op(
input_op=op_repr,
op_suffix='',
nest_prefix='',
text=f'{chan}',
nest_indent=len(op_repr)-1,
rm_from_first_ln='<',
# ^XXX, subtract -1 to account for
# <Channel
# ^_chevron to be stripped
)
start_status: str = (
'Handling RPC `Start` request\n'
f'<= peer: {actorid}\n\n'
f' |_{chan}\n'
f' |_cid: {cid}\n\n'
# f' |_{ns}.{funcname}({kwargs})\n'
f'>> {actor.uid}\n'
f' |_{actor}\n'
f' -> nsp: `{ns}.{funcname}({kwargs})`\n'
# f' |_{ns}.{funcname}({kwargs})\n\n'
# f'{pretty_struct.pformat(msg)}\n'
'Handling RPC request\n'
f'{req_repr}\n'
f'\n'
f'->{{ ipc-context-id: {cid!r}\n'
f'->{{ nsp for fn: `{ns}.{funcname}({kwargs})`\n'
)
# runtime-internal endpoint: `Actor.<funcname>`
@ -1122,10 +1143,6 @@ async def process_messages(
await chan.send(err_msg)
continue
start_status += (
f' -> func: {func}\n'
)
# schedule a task for the requested RPC function
# in the actor's main "service nursery".
#
@ -1133,7 +1150,7 @@ async def process_messages(
# supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks`
# table?
start_status += ' -> scheduling new task..\n'
start_status += '->( scheduling new task..\n'
log.runtime(start_status)
try:
ctx: Context = await actor._service_n.start(
@ -1222,7 +1239,7 @@ async def process_messages(
f'|_{chan}\n'
)
await actor.cancel_rpc_tasks(
req_uid=actor.uid,
req_aid=actor.aid,
# a "self cancel" in terms of the lifetime of the
# IPC connection which is presumed to be the
# source of any requests for spawned tasks.
@ -1294,13 +1311,37 @@ async def process_messages(
finally:
# msg debugging for when he machinery is brokey
if msg is None:
message: str = 'Exiting IPC msg loop without receiving a msg?'
message: str = 'Exiting RPC-loop without receiving a msg?'
else:
task_op_repr: str = ')>'
task: trio.Task = trio.lowlevel.current_task()
# maybe add cancelled opt prefix
if task._cancel_status.effectively_cancelled:
task_op_repr = 'c' + task_op_repr
task_repr: str = _pformat.nest_from_op(
input_op=task_op_repr,
text=f'{task!r}',
nest_indent=1,
)
# chan_op_repr: str = '<=} '
# chan_repr: str = _pformat.nest_from_op(
# input_op=chan_op_repr,
# op_suffix='',
# nest_prefix='',
# text=chan.pformat(),
# nest_indent=len(chan_op_repr)-1,
# rm_from_first_ln='<',
# )
message: str = (
'Exiting IPC msg loop with final msg\n\n'
f'<= peer: {chan.uid}\n'
f' |_{chan}\n\n'
# f'{pretty_struct.pformat(msg)}'
f'Exiting RPC-loop with final msg\n'
f'\n'
# f'{chan_repr}\n'
f'{task_repr}\n'
f'\n'
f'{pretty_struct.pformat(msg)}'
f'\n'
)
log.runtime(message)

View File

@ -262,6 +262,7 @@ def nest_from_op(
nest_indent: int|None = None,
# XXX indent `next_prefix` "to-the-right-of" `input_op`
# by this count of whitespaces (' ').
rm_from_first_ln: str|None = None,
) -> str:
'''
@ -346,20 +347,35 @@ def nest_from_op(
if (
nest_prefix
and
nest_indent
nest_indent != 0
):
if nest_indent is not None:
nest_prefix: str = textwrap.indent(
nest_prefix,
prefix=nest_indent*' ',
)
nest_indent: int = len(nest_prefix)
# determine body-text indent either by,
# - using wtv explicit indent value is provided,
# OR
# - auto-calcing the indent to embed `text` under
# the `nest_prefix` if provided, **IFF** `nest_indent=None`.
tree_str_indent: int = 0
if nest_indent not in {0, None}:
tree_str_indent = nest_indent
elif (
nest_prefix
and
nest_indent != 0
):
tree_str_indent = len(nest_prefix)
indented_tree_str: str = text
tree_str_indent: int = 0
if nest_indent != 0:
tree_str_indent: int = len(nest_prefix)
if tree_str_indent:
indented_tree_str: str = textwrap.indent(
text,
prefix=' '*tree_str_indent
prefix=' '*tree_str_indent,
)
# inject any provided nesting-prefix chars
@ -369,18 +385,35 @@ def nest_from_op(
f'{nest_prefix}{indented_tree_str[tree_str_indent:]}'
)
if (
not prefix_op
or
rm_from_first_ln
):
tree_lns: list[str] = indented_tree_str.splitlines()
first: str = tree_lns[0]
if rm_from_first_ln:
first = first.strip().replace(
rm_from_first_ln,
'',
)
indented_tree_str: str = '\n'.join(tree_lns[1:])
if prefix_op:
indented_tree_str = (
f'{first}\n'
f'{indented_tree_str}'
)
if prefix_op:
return (
f'{input_op}{op_suffix}'
f'{indented_tree_str}'
)
else:
tree_lns: list[str] = indented_tree_str.splitlines()
first: str = tree_lns[0]
rest: str = '\n'.join(tree_lns[1:])
return (
f'{first}{input_op}{op_suffix}'
f'{rest}'
f'{indented_tree_str}'
)

View File

@ -171,10 +171,22 @@ class Channel:
)
assert transport.raddr == addr
chan = Channel(transport=transport)
# ?TODO, compact this into adapter level-methods?
# -[ ] would avoid extra repr-calcs if level not active?
# |_ how would the `calc_if_level` look though? func?
if log.at_least_level('runtime'):
from tractor.devx import (
pformat as _pformat,
)
chan_repr: str = _pformat.nest_from_op(
input_op='[>',
text=chan.pformat(),
nest_indent=1,
)
log.runtime(
f'Connected channel IPC transport\n'
f'[>\n'
f' |_{chan}\n'
f'{chan_repr}'
)
return chan
@ -218,17 +230,19 @@ class Channel:
if privates else ''
) + ( # peer-actor (processs) section
f' |_peer: {self.aid.reprol()!r}\n'
if self.aid else '<unknown>'
if self.aid else ' |_peer: <unknown>\n'
) + (
f' |_msgstream: {tpt_name}\n'
f' proto={tpt.laddr.proto_key!r}\n'
f' layer={tpt.layer_key!r}\n'
f' laddr={tpt.laddr}\n'
f' raddr={tpt.raddr}\n'
f' codec={tpt.codec_key!r}\n'
f' stream={tpt.stream}\n'
f' maddr={tpt.maddr!r}\n'
f' drained={tpt.drained}\n'
f' maddr: {tpt.maddr!r}\n'
f' proto: {tpt.laddr.proto_key!r}\n'
f' layer: {tpt.layer_key!r}\n'
f' codec: {tpt.codec_key!r}\n'
f' .laddr={tpt.laddr}\n'
f' .raddr={tpt.raddr}\n'
) + (
f' ._transport.stream={tpt.stream}\n'
f' ._transport.drained={tpt.drained}\n'
if privates else ''
) + (
f' _send_lock={tpt._send_lock.statistics()}\n'
if privates else ''
@ -257,6 +271,10 @@ class Channel:
def raddr(self) -> Address|None:
return self._transport.raddr if self._transport else None
@property
def maddr(self) -> str:
return self._transport.maddr if self._transport else '<no-tpt>'
# TODO: something like,
# `pdbp.hideframe_on(errors=[MsgTypeError])`
# instead of the `try/except` hack we have rn..
@ -444,8 +462,8 @@ class Channel:
await self.send(aid)
peer_aid: Aid = await self.recv()
log.runtime(
f'Received hanshake with peer actor,\n'
f'{peer_aid}\n'
f'Received hanshake with peer\n'
f'<= {peer_aid.reprol(sin_uuid=False)}\n'
)
# NOTE, we always are referencing the remote peer!
self.aid = peer_aid

View File

@ -17,9 +17,16 @@
Utils to tame mp non-SC madeness
'''
# !TODO! in 3.13 this can be disabled (the-same/similarly) using
# a flag,
# - [ ] soo if it works like this, drop this module entirely for
# 3.13+ B)
# |_https://docs.python.org/3/library/multiprocessing.shared_memory.html
#
def disable_mantracker():
'''
Disable all ``multiprocessing``` "resource tracking" machinery since
Disable all `multiprocessing` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness.
'''

View File

@ -26,7 +26,7 @@ from contextlib import (
from functools import partial
from itertools import chain
import inspect
from pprint import pformat
import textwrap
from types import (
ModuleType,
)
@ -43,7 +43,10 @@ from trio import (
SocketListener,
)
# from ..devx import debug
from ..devx.pformat import (
ppfmt,
nest_from_op,
)
from .._exceptions import (
TransportClosed,
)
@ -141,9 +144,8 @@ async def maybe_wait_on_canced_subs(
):
log.cancel(
'Waiting on cancel request to peer..\n'
f'c)=>\n'
f' |_{chan.aid}\n'
'Waiting on cancel request to peer\n'
f'c)=> {chan.aid.reprol()}@[{chan.maddr}]\n'
)
# XXX: this is a soft wait on the channel (and its
@ -179,7 +181,7 @@ async def maybe_wait_on_canced_subs(
log.warning(
'Draining msg from disconnected peer\n'
f'{chan_info}'
f'{pformat(msg)}\n'
f'{ppfmt(msg)}\n'
)
# cid: str|None = msg.get('cid')
cid: str|None = msg.cid
@ -248,7 +250,7 @@ async def maybe_wait_on_canced_subs(
if children := local_nursery._children:
# indent from above local-nurse repr
report += (
f' |_{pformat(children)}\n'
f' |_{ppfmt(children)}\n'
)
log.warning(report)
@ -279,8 +281,9 @@ async def maybe_wait_on_canced_subs(
log.runtime(
f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.aid}@{chan.raddr}\n'
f' |_{proc}\n'
f'<=x {chan.aid.reprol()}@[{chan.maddr}]\n'
f'\n'
f'{proc}\n'
)
return local_nursery
@ -324,9 +327,10 @@ async def handle_stream_from_peer(
chan = Channel.from_stream(stream)
con_status: str = (
'New inbound IPC connection <=\n'
f'|_{chan}\n'
f'New inbound IPC transport connection\n'
f'<=( {stream!r}\n'
)
con_status_steps: str = ''
# initial handshake with peer phase
try:
@ -372,7 +376,7 @@ async def handle_stream_from_peer(
if _pre_chan := server._peers.get(uid):
familiar: str = 'pre-existing-peer'
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
con_status += (
con_status_steps += (
f' -> Handshake with {familiar} `{uid_short}` complete\n'
)
@ -397,7 +401,7 @@ async def handle_stream_from_peer(
None,
)
if event:
con_status += (
con_status_steps += (
' -> Waking subactor spawn waiters: '
f'{event.statistics().tasks_waiting}\n'
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
@ -408,7 +412,7 @@ async def handle_stream_from_peer(
event.set()
else:
con_status += (
con_status_steps += (
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
) # type: ignore
@ -422,8 +426,15 @@ async def handle_stream_from_peer(
# TODO: can we just use list-ref directly?
chans.append(chan)
con_status += ' -> Entering RPC msg loop..\n'
log.runtime(con_status)
con_status_steps += ' -> Entering RPC msg loop..\n'
log.runtime(
con_status
+
textwrap.indent(
con_status_steps,
prefix=' '*3, # align to first-ln
)
)
# Begin channel management - respond to remote requests and
# process received reponses.
@ -456,41 +467,67 @@ async def handle_stream_from_peer(
disconnected=disconnected,
)
# ``Channel`` teardown and closure sequence
# `Channel` teardown and closure sequence
# drop ref to channel so it can be gc-ed and disconnected
con_teardown_status: str = (
f'IPC channel disconnected:\n'
f'<=x uid: {chan.aid}\n'
f' |_{pformat(chan)}\n\n'
#
# -[x]TODO mk this be like
# <=x Channel(
# |_field: blah
# )>
op_repr: str = '<=x '
chan_repr: str = nest_from_op(
input_op=op_repr,
op_suffix='',
nest_prefix='',
text=chan.pformat(),
nest_indent=len(op_repr)-1,
rm_from_first_ln='<',
)
con_teardown_status: str = (
f'IPC channel disconnect\n'
f'\n'
f'{chan_repr}\n'
f'\n'
)
chans.remove(chan)
# TODO: do we need to be this pedantic?
if not chans:
con_teardown_status += (
f'-> No more channels with {chan.aid}'
f'-> No more channels with {chan.aid.reprol()!r}\n'
)
server._peers.pop(uid, None)
peers_str: str = ''
for uid, chans in server._peers.items():
peers_str += (
f'uid: {uid}\n'
)
for i, chan in enumerate(chans):
peers_str += (
f' |_[{i}] {pformat(chan)}\n'
)
if peers := list(server._peers.values()):
peer_cnt: int = len(peers)
if (
(first := peers[0][0]) is not chan
and
not disconnected
and
peer_cnt > 1
):
con_teardown_status += (
f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n'
f'-> Remaining IPC {peer_cnt-1!r} peers:\n'
)
for chans in server._peers.values():
first: Channel = chans[0]
if not (
first is chan
and
disconnected
):
con_teardown_status += (
f' |_{first.aid.reprol()!r} -> {len(chans)!r} chans\n'
)
# No more channels to other actors (at all) registered
# as connected.
if not server._peers:
con_teardown_status += (
'Signalling no more peer channel connections'
'-> Signalling no more peer connections!\n'
)
server._no_more_peers.set()
@ -579,10 +616,10 @@ async def handle_stream_from_peer(
class Endpoint(Struct):
'''
An instance of an IPC "bound" address where the lifetime of the
"ability to accept connections" (from clients) and then handle
those inbound sessions or sequences-of-packets is determined by
a (maybe pair of) nurser(y/ies).
An instance of an IPC "bound" address where the lifetime of an
"ability to accept connections" and handle the subsequent
sequence-of-packets (maybe oriented as sessions) is determined by
the underlying nursery scope(s).
'''
addr: Address
@ -600,6 +637,24 @@ class Endpoint(Struct):
MsgTransport, # handle to encoded-msg transport stream
] = {}
def pformat(
self,
indent: int = 0,
privates: bool = False,
) -> str:
type_repr: str = type(self).__name__
fmtstr: str = (
# !TODO, always be ns aware!
# f'|_netns: {netns}\n'
f' |.addr: {self.addr!r}\n'
f' |_peers: {len(self.peer_tpts)}\n'
)
return (
f'<{type_repr}(\n'
f'{fmtstr}'
f')>'
)
async def start_listener(self) -> SocketListener:
tpt_mod: ModuleType = inspect.getmodule(self.addr)
lstnr: SocketListener = await tpt_mod.start_listener(
@ -639,11 +694,13 @@ class Endpoint(Struct):
class Server(Struct):
_parent_tn: Nursery
_stream_handler_tn: Nursery
# level-triggered sig for whether "no peers are currently
# connected"; field is **always** set to an instance but
# initialized with `.is_set() == True`.
_no_more_peers: trio.Event
# active eps as allocated by `.listen_on()`
_endpoints: list[Endpoint] = []
# connection tracking & mgmt
@ -651,12 +708,19 @@ class Server(Struct):
str, # uaid
list[Channel], # IPC conns from peer
] = defaultdict(list)
# events-table with entries registered unset while the local
# actor is waiting on a new actor to inbound connect, often
# a parent waiting on its child just after spawn.
_peer_connected: dict[
tuple[str, str],
trio.Event,
] = {}
# syncs for setup/teardown sequences
# - null when not yet booted,
# - unset when active,
# - set when fully shutdown with 0 eps active.
_shutdown: trio.Event|None = None
# TODO, maybe just make `._endpoints: list[Endpoint]` and
@ -664,7 +728,6 @@ class Server(Struct):
# @property
# def addrs2eps(self) -> dict[Address, Endpoint]:
# ...
@property
def proto_keys(self) -> list[str]:
return [
@ -690,7 +753,7 @@ class Server(Struct):
# TODO: obvi a different server type when we eventually
# support some others XD
log.runtime(
f'Cancelling server(s) for\n'
f'Cancelling server(s) for tpt-protos\n'
f'{self.proto_keys!r}\n'
)
self._parent_tn.cancel_scope.cancel()
@ -717,6 +780,14 @@ class Server(Struct):
f'protos: {tpt_protos!r}\n'
)
def len_peers(
self,
) -> int:
return len([
chan.connected()
for chan in chain(*self._peers.values())
])
def has_peers(
self,
check_chans: bool = False,
@ -730,13 +801,11 @@ class Server(Struct):
has_peers
and
check_chans
and
(peer_cnt := self.len_peers())
):
has_peers: bool = (
any(chan.connected()
for chan in chain(
*self._peers.values()
)
)
peer_cnt > 0
and
has_peers
)
@ -803,30 +872,66 @@ class Server(Struct):
return ev.is_set()
def pformat(self) -> str:
@property
def repr_state(self) -> str:
'''
A `str`-status describing the current state of this
IPC server in terms of the current operating "phase".
'''
status = 'server is active'
if self.has_peers():
peer_cnt: int = self.len_peers()
status: str = (
f'{peer_cnt!r} peer chans'
)
else:
status: str = 'No peer chans'
if self.is_shutdown():
status: str = 'server-shutdown'
return status
def pformat(
self,
privates: bool = False,
) -> str:
eps: list[Endpoint] = self._endpoints
state_repr: str = (
f'{len(eps)!r} IPC-endpoints active'
)
# state_repr: str = (
# f'{len(eps)!r} endpoints active'
# )
fmtstr = (
f' |_state: {state_repr}\n'
f' no_more_peers: {self.has_peers()}\n'
f' |_state: {self.repr_state!r}\n'
)
if privates:
fmtstr += f' no_more_peers: {self.has_peers()}\n'
if self._shutdown is not None:
shutdown_stats: EventStatistics = self._shutdown.statistics()
fmtstr += (
f' task_waiting_on_shutdown: {shutdown_stats}\n'
)
if eps := self._endpoints:
addrs: list[tuple] = [
ep.addr for ep in eps
]
repr_eps: str = ppfmt(addrs)
fmtstr += (
# TODO, use the `ppfmt()` helper from `modden`!
f' |_endpoints: {pformat(self._endpoints)}\n'
f' |_peers: {len(self._peers)} connected\n'
f' |_endpoints: {repr_eps}\n'
# ^TODO? how to indent closing ']'..
)
if peers := self._peers:
fmtstr += (
f' |_peers: {len(peers)} connected\n'
)
return (
f'<IPCServer(\n'
f'<Server(\n'
f'{fmtstr}'
f')>\n'
)
@ -885,8 +990,8 @@ class Server(Struct):
)
log.runtime(
f'Binding to endpoints for,\n'
f'{accept_addrs}\n'
f'Binding endpoints\n'
f'{ppfmt(accept_addrs)}\n'
)
eps: list[Endpoint] = await self._parent_tn.start(
partial(
@ -896,13 +1001,19 @@ class Server(Struct):
listen_addrs=accept_addrs,
)
)
self._endpoints.extend(eps)
serv_repr: str = nest_from_op(
input_op='(>',
text=self.pformat(),
nest_indent=1,
)
log.runtime(
f'Started IPC endpoints\n'
f'{eps}\n'
f'Started IPC server\n'
f'{serv_repr}'
)
self._endpoints.extend(eps)
# XXX, just a little bit of sanity
# XXX, a little sanity on new ep allocations
group_tn: Nursery|None = None
ep: Endpoint
for ep in eps:
@ -956,9 +1067,13 @@ async def _serve_ipc_eps(
stream_handler_tn=stream_handler_tn,
)
try:
ep_sclang: str = nest_from_op(
input_op='>[',
text=f'{ep.pformat()}',
)
log.runtime(
f'Starting new endpoint listener\n'
f'{ep}\n'
f'{ep_sclang}\n'
)
listener: trio.abc.Listener = await ep.start_listener()
assert listener is ep._listener
@ -996,17 +1111,6 @@ async def _serve_ipc_eps(
handler_nursery=stream_handler_tn
)
)
# TODO, wow make this message better! XD
log.runtime(
'Started server(s)\n'
+
'\n'.join([f'|_{addr}' for addr in listen_addrs])
)
log.runtime(
f'Started IPC endpoints\n'
f'{eps}\n'
)
task_status.started(
eps,
)
@ -1049,8 +1153,7 @@ async def open_ipc_server(
try:
yield ipc_server
log.runtime(
f'Waiting on server to shutdown or be cancelled..\n'
f'{ipc_server}'
'Server-tn running until terminated\n'
)
# TODO? when if ever would we want/need this?
# with trio.CancelScope(shield=True):

View File

@ -127,10 +127,9 @@ async def start_listener(
Start a TCP socket listener on the given `TCPAddress`.
'''
log.info(
f'Attempting to bind TCP socket\n'
f'>[\n'
f'|_{addr}\n'
log.runtime(
f'Trying socket bind\n'
f'>[ {addr}\n'
)
# ?TODO, maybe we should just change the lower-level call this is
# using internall per-listener?
@ -145,11 +144,10 @@ async def start_listener(
assert len(listeners) == 1
listener = listeners[0]
host, port = listener.socket.getsockname()[:2]
bound_addr: TCPAddress = type(addr).from_addr((host, port))
log.info(
f'Listening on TCP socket\n'
f'[>\n'
f' |_{addr}\n'
f'[> {bound_addr}\n'
)
return listener

View File

@ -81,10 +81,35 @@ BOLD_PALETTE = {
}
def at_least_level(
log: Logger|LoggerAdapter,
level: int|str,
) -> bool:
'''
Predicate to test if a given level is active.
'''
if isinstance(level, str):
level: int = CUSTOM_LEVELS[level.upper()]
if log.getEffectiveLevel() <= level:
return True
return False
# TODO: this isn't showing the correct '{filename}'
# as it did before..
class StackLevelAdapter(LoggerAdapter):
def at_least_level(
self,
level: str,
) -> bool:
return at_least_level(
log=self,
level=level,
)
def transport(
self,
msg: str,
@ -401,19 +426,3 @@ def get_loglevel() -> str:
# global module logger for tractor itself
log: StackLevelAdapter = get_logger('tractor')
def at_least_level(
log: Logger|LoggerAdapter,
level: int|str,
) -> bool:
'''
Predicate to test if a given level is active.
'''
if isinstance(level, str):
level: int = CUSTOM_LEVELS[level.upper()]
if log.getEffectiveLevel() <= level:
return True
return False

View File

@ -177,6 +177,16 @@ class Aid(
f'{self.name}@{self.pid!r}'
)
# mk hashable via `.uuid`
def __hash__(self) -> int:
return hash(self.uuid)
def __eq__(self, other: Aid) -> bool:
return self.uuid == other.uuid
# use pretty fmt since often repr-ed for console/log
__repr__ = pretty_struct.Struct.__repr__
class SpawnSpec(
pretty_struct.Struct,

View File

@ -60,6 +60,9 @@ def find_masked_excs(
return None
# XXX, relevant ish discussion @ `trio`-core,
# https://github.com/python-trio/trio/issues/455#issuecomment-2785122216
#
@acm
async def maybe_raise_from_masking_exc(
tn: trio.Nursery|None = None,