Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet c0058024c2 Add todo for py3.13+ `.shared_memory`'s new `track=False` support.. finally they added it XD 2025-07-04 00:08:52 -04:00
Tyler Goodlet 065104401c Even more `.ipc.*` repr refinements
Mostly adjusting indentation, noise level, and clarity via `.pformat()`
tweaks more general use of `.devx.pformat.nest_from_op()`.

Specific impl deats,
- use `pformat.ppfmt()/`nest_from_op()` more seriously throughout
  `._server`.
- add a `._server.Endpoint.pformat()`.
- add `._server.Server.len_peers()` and `.repr_state()`.
- polish `Server.pformat()`.
- drop some redundant `log.runtime()`s from `._serve_ipc_eps()` instead
  leaving-them-only/putting-them in the caller pub meth.
- `._tcp.start_listener()` log the bound addr, not the input (which may
  be the 0-port.
2025-07-03 23:33:02 -04:00
Tyler Goodlet 3201437f4e More `.ipc.Channel`-repr related tweaks
- only generate a repr in `.from_addr()` when log level is >= 'runtime'.
 |_ add a todo about supporting this optimization more generally on our
   adapter.
- fix `Channel.pformat()` to show unknown peer field line fmt correctly.
- add a `Channel.maddr: str` which just delegates directly to the
  `._transport` like other pass-thru property fields.
2025-06-29 15:47:42 -04:00
Tyler Goodlet a9da16892d Mk `Aid` hashable, use pretty-`.__repr__()`
Hash on the `.uuid: str` and delegate verbatim to
`msg.pretty_struct.Struct`'s equiv method.
2025-06-29 15:39:09 -04:00
Tyler Goodlet 1b609113c3 .trionics: link in `finally`-footgun `trio` GH ish 2025-06-29 15:34:10 -04:00
Tyler Goodlet 4a80cda841 .log: expose `at_least_level()` as `StackLevelAdapter` meth 2025-06-29 15:33:31 -04:00
Tyler Goodlet 131e2ee0a4 Drop `actor_info: str` from `._entry` logs 2025-06-29 14:59:50 -04:00
Tyler Goodlet 79ef973058 Try `nest_from_op()` in some `._rpc` spots
To start trying out,
- using in the `Start`-msg handler-block to repr the msg coming
  *from* a `repr(Channel)` using '<=)` sclang op.
- for a completed RPC task in `_invoke_non_context()`.
- for the msg loop task's termination report.
2025-06-29 14:47:03 -04:00
Tyler Goodlet c738492879 Hide more `Channel._transport` privates for repr
Such as the `MsgTransport.stream` and `.drain` attrs since they're
rarely that important at the chan level. Also start adopting
a `.<attr>=` style for actual attrs of the type versus a `<name>:
` style for meta-field info lines.
2025-06-29 13:41:10 -04:00
Tyler Goodlet a931274da6 Moar `nest_from_op()` tweaks..
- better `nest_indent` logic where we either get a non-zero value and
  apply it strictly to both the `nest_prefix` and `text`, OR we
  auto-calc it from any `nest_prefix`, NOT a conflation of both..
- add a `rm_from_first_ln: str` which allows removing chars from the
  first line of `text` after a `str.strip()` (handy for removing
  the '<Channel' first chevron from type-reprs).
2025-06-29 13:37:32 -04:00
10 changed files with 394 additions and 178 deletions

View File

@ -21,7 +21,7 @@ Sub-process entry points.
from __future__ import annotations from __future__ import annotations
from functools import partial from functools import partial
import multiprocessing as mp import multiprocessing as mp
import os # import os
from typing import ( from typing import (
Any, Any,
TYPE_CHECKING, TYPE_CHECKING,
@ -38,6 +38,7 @@ from .devx import (
_frame_stack, _frame_stack,
pformat, pformat,
) )
# from .msg import pretty_struct
from .to_asyncio import run_as_asyncio_guest from .to_asyncio import run_as_asyncio_guest
from ._addr import UnwrappedAddress from ._addr import UnwrappedAddress
from ._runtime import ( from ._runtime import (
@ -127,20 +128,13 @@ def _trio_main(
if actor.loglevel is not None: if actor.loglevel is not None:
get_console_log(actor.loglevel) get_console_log(actor.loglevel)
actor_info: str = (
f'|_{actor}\n'
f' uid: {actor.uid}\n'
f' pid: {os.getpid()}\n'
f' parent_addr: {parent_addr}\n'
f' loglevel: {actor.loglevel}\n'
)
log.info( log.info(
'Starting new `trio` subactor\n' f'Starting `trio` subactor from parent @ '
f'{parent_addr}\n'
+ +
pformat.nest_from_op( pformat.nest_from_op(
input_op='>(', # see syntax ideas above input_op='>(', # see syntax ideas above
text=actor_info, text=f'{actor}',
nest_indent=2, # since "complete"
) )
) )
logmeth = log.info logmeth = log.info
@ -149,7 +143,7 @@ def _trio_main(
+ +
pformat.nest_from_op( pformat.nest_from_op(
input_op=')>', # like a "closed-to-play"-icon from super perspective input_op=')>', # like a "closed-to-play"-icon from super perspective
text=actor_info, text=f'{actor}',
nest_indent=1, nest_indent=1,
) )
) )
@ -167,7 +161,7 @@ def _trio_main(
+ +
pformat.nest_from_op( pformat.nest_from_op(
input_op='c)>', # closed due to cancel (see above) input_op='c)>', # closed due to cancel (see above)
text=actor_info, text=f'{actor}',
) )
) )
except BaseException as err: except BaseException as err:
@ -177,7 +171,7 @@ def _trio_main(
+ +
pformat.nest_from_op( pformat.nest_from_op(
input_op='x)>', # closed by error input_op='x)>', # closed by error
text=actor_info, text=f'{actor}',
) )
) )
# NOTE since we raise a tb will already be shown on the # NOTE since we raise a tb will already be shown on the

View File

@ -64,6 +64,7 @@ from .trionics import (
from .devx import ( from .devx import (
debug, debug,
add_div, add_div,
pformat as _pformat,
) )
from . import _state from . import _state
from .log import get_logger from .log import get_logger
@ -72,7 +73,7 @@ from .msg import (
MsgCodec, MsgCodec,
PayloadT, PayloadT,
NamespacePath, NamespacePath,
# pretty_struct, pretty_struct,
_ops as msgops, _ops as msgops,
) )
from tractor.msg.types import ( from tractor.msg.types import (
@ -220,11 +221,18 @@ async def _invoke_non_context(
task_status.started(ctx) task_status.started(ctx)
result = await coro result = await coro
fname: str = func.__name__ fname: str = func.__name__
op_nested_task: str = _pformat.nest_from_op(
input_op=f')> cid: {ctx.cid!r}',
text=f'{ctx._task}',
nest_indent=1, # under >
)
log.runtime( log.runtime(
'RPC complete:\n' f'RPC task complete\n'
f'task: {ctx._task}\n' f'\n'
f'|_cid={ctx.cid}\n' f'{op_nested_task}\n'
f'|_{fname}() -> {pformat(result)}\n' f'\n'
f')> {fname}() -> {pformat(result)}\n'
) )
# NOTE: only send result if we know IPC isn't down # NOTE: only send result if we know IPC isn't down
@ -1043,7 +1051,7 @@ async def process_messages(
): ):
target_cid: str = kwargs['cid'] target_cid: str = kwargs['cid']
kwargs |= { kwargs |= {
'requesting_uid': chan.uid, 'requesting_aid': chan.aid,
'ipc_msg': msg, 'ipc_msg': msg,
# XXX NOTE! ONLY the rpc-task-owning # XXX NOTE! ONLY the rpc-task-owning
@ -1079,21 +1087,34 @@ async def process_messages(
ns=ns, ns=ns,
func=funcname, func=funcname,
kwargs=kwargs, # type-spec this? see `msg.types` kwargs=kwargs, # type-spec this? see `msg.types`
uid=actorid, uid=actor_uuid,
): ):
if actor_uuid != chan.aid.uid:
raise RuntimeError(
f'IPC <Start> msg <-> chan.aid mismatch!?\n'
f'Channel.aid = {chan.aid!r}\n'
f'Start.uid = {actor_uuid!r}\n'
)
# await debug.pause()
op_repr: str = 'Start <=) '
req_repr: str = _pformat.nest_from_op(
input_op=op_repr,
op_suffix='',
nest_prefix='',
text=f'{chan}',
nest_indent=len(op_repr)-1,
rm_from_first_ln='<',
# ^XXX, subtract -1 to account for
# <Channel
# ^_chevron to be stripped
)
start_status: str = ( start_status: str = (
'Handling RPC `Start` request\n' 'Handling RPC request\n'
f'<= peer: {actorid}\n\n' f'{req_repr}\n'
f' |_{chan}\n' f'\n'
f' |_cid: {cid}\n\n' f'->{{ ipc-context-id: {cid!r}\n'
# f' |_{ns}.{funcname}({kwargs})\n' f'->{{ nsp for fn: `{ns}.{funcname}({kwargs})`\n'
f'>> {actor.uid}\n'
f' |_{actor}\n'
f' -> nsp: `{ns}.{funcname}({kwargs})`\n'
# f' |_{ns}.{funcname}({kwargs})\n\n'
# f'{pretty_struct.pformat(msg)}\n'
) )
# runtime-internal endpoint: `Actor.<funcname>` # runtime-internal endpoint: `Actor.<funcname>`
@ -1122,10 +1143,6 @@ async def process_messages(
await chan.send(err_msg) await chan.send(err_msg)
continue continue
start_status += (
f' -> func: {func}\n'
)
# schedule a task for the requested RPC function # schedule a task for the requested RPC function
# in the actor's main "service nursery". # in the actor's main "service nursery".
# #
@ -1133,7 +1150,7 @@ async def process_messages(
# supervision isolation? would avoid having to # supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks` # manage RPC tasks individually in `._rpc_tasks`
# table? # table?
start_status += ' -> scheduling new task..\n' start_status += '->( scheduling new task..\n'
log.runtime(start_status) log.runtime(start_status)
try: try:
ctx: Context = await actor._service_n.start( ctx: Context = await actor._service_n.start(
@ -1222,7 +1239,7 @@ async def process_messages(
f'|_{chan}\n' f'|_{chan}\n'
) )
await actor.cancel_rpc_tasks( await actor.cancel_rpc_tasks(
req_uid=actor.uid, req_aid=actor.aid,
# a "self cancel" in terms of the lifetime of the # a "self cancel" in terms of the lifetime of the
# IPC connection which is presumed to be the # IPC connection which is presumed to be the
# source of any requests for spawned tasks. # source of any requests for spawned tasks.
@ -1294,13 +1311,37 @@ async def process_messages(
finally: finally:
# msg debugging for when he machinery is brokey # msg debugging for when he machinery is brokey
if msg is None: if msg is None:
message: str = 'Exiting IPC msg loop without receiving a msg?' message: str = 'Exiting RPC-loop without receiving a msg?'
else: else:
task_op_repr: str = ')>'
task: trio.Task = trio.lowlevel.current_task()
# maybe add cancelled opt prefix
if task._cancel_status.effectively_cancelled:
task_op_repr = 'c' + task_op_repr
task_repr: str = _pformat.nest_from_op(
input_op=task_op_repr,
text=f'{task!r}',
nest_indent=1,
)
# chan_op_repr: str = '<=} '
# chan_repr: str = _pformat.nest_from_op(
# input_op=chan_op_repr,
# op_suffix='',
# nest_prefix='',
# text=chan.pformat(),
# nest_indent=len(chan_op_repr)-1,
# rm_from_first_ln='<',
# )
message: str = ( message: str = (
'Exiting IPC msg loop with final msg\n\n' f'Exiting RPC-loop with final msg\n'
f'<= peer: {chan.uid}\n' f'\n'
f' |_{chan}\n\n' # f'{chan_repr}\n'
# f'{pretty_struct.pformat(msg)}' f'{task_repr}\n'
f'\n'
f'{pretty_struct.pformat(msg)}'
f'\n'
) )
log.runtime(message) log.runtime(message)

View File

@ -262,6 +262,7 @@ def nest_from_op(
nest_indent: int|None = None, nest_indent: int|None = None,
# XXX indent `next_prefix` "to-the-right-of" `input_op` # XXX indent `next_prefix` "to-the-right-of" `input_op`
# by this count of whitespaces (' '). # by this count of whitespaces (' ').
rm_from_first_ln: str|None = None,
) -> str: ) -> str:
''' '''
@ -346,20 +347,35 @@ def nest_from_op(
if ( if (
nest_prefix nest_prefix
and and
nest_indent nest_indent != 0
): ):
nest_prefix: str = textwrap.indent( if nest_indent is not None:
nest_prefix, nest_prefix: str = textwrap.indent(
prefix=nest_indent*' ', nest_prefix,
) prefix=nest_indent*' ',
)
nest_indent: int = len(nest_prefix)
# determine body-text indent either by,
# - using wtv explicit indent value is provided,
# OR
# - auto-calcing the indent to embed `text` under
# the `nest_prefix` if provided, **IFF** `nest_indent=None`.
tree_str_indent: int = 0
if nest_indent not in {0, None}:
tree_str_indent = nest_indent
elif (
nest_prefix
and
nest_indent != 0
):
tree_str_indent = len(nest_prefix)
indented_tree_str: str = text indented_tree_str: str = text
tree_str_indent: int = 0 if tree_str_indent:
if nest_indent != 0:
tree_str_indent: int = len(nest_prefix)
indented_tree_str: str = textwrap.indent( indented_tree_str: str = textwrap.indent(
text, text,
prefix=' '*tree_str_indent prefix=' '*tree_str_indent,
) )
# inject any provided nesting-prefix chars # inject any provided nesting-prefix chars
@ -369,18 +385,35 @@ def nest_from_op(
f'{nest_prefix}{indented_tree_str[tree_str_indent:]}' f'{nest_prefix}{indented_tree_str[tree_str_indent:]}'
) )
if (
not prefix_op
or
rm_from_first_ln
):
tree_lns: list[str] = indented_tree_str.splitlines()
first: str = tree_lns[0]
if rm_from_first_ln:
first = first.strip().replace(
rm_from_first_ln,
'',
)
indented_tree_str: str = '\n'.join(tree_lns[1:])
if prefix_op:
indented_tree_str = (
f'{first}\n'
f'{indented_tree_str}'
)
if prefix_op: if prefix_op:
return ( return (
f'{input_op}{op_suffix}' f'{input_op}{op_suffix}'
f'{indented_tree_str}' f'{indented_tree_str}'
) )
else: else:
tree_lns: list[str] = indented_tree_str.splitlines()
first: str = tree_lns[0]
rest: str = '\n'.join(tree_lns[1:])
return ( return (
f'{first}{input_op}{op_suffix}' f'{first}{input_op}{op_suffix}'
f'{rest}' f'{indented_tree_str}'
) )

View File

@ -171,11 +171,23 @@ class Channel:
) )
assert transport.raddr == addr assert transport.raddr == addr
chan = Channel(transport=transport) chan = Channel(transport=transport)
log.runtime(
f'Connected channel IPC transport\n' # ?TODO, compact this into adapter level-methods?
f'[>\n' # -[ ] would avoid extra repr-calcs if level not active?
f' |_{chan}\n' # |_ how would the `calc_if_level` look though? func?
) if log.at_least_level('runtime'):
from tractor.devx import (
pformat as _pformat,
)
chan_repr: str = _pformat.nest_from_op(
input_op='[>',
text=chan.pformat(),
nest_indent=1,
)
log.runtime(
f'Connected channel IPC transport\n'
f'{chan_repr}'
)
return chan return chan
@cm @cm
@ -218,17 +230,19 @@ class Channel:
if privates else '' if privates else ''
) + ( # peer-actor (processs) section ) + ( # peer-actor (processs) section
f' |_peer: {self.aid.reprol()!r}\n' f' |_peer: {self.aid.reprol()!r}\n'
if self.aid else '<unknown>' if self.aid else ' |_peer: <unknown>\n'
) + ( ) + (
f' |_msgstream: {tpt_name}\n' f' |_msgstream: {tpt_name}\n'
f' proto={tpt.laddr.proto_key!r}\n' f' maddr: {tpt.maddr!r}\n'
f' layer={tpt.layer_key!r}\n' f' proto: {tpt.laddr.proto_key!r}\n'
f' laddr={tpt.laddr}\n' f' layer: {tpt.layer_key!r}\n'
f' raddr={tpt.raddr}\n' f' codec: {tpt.codec_key!r}\n'
f' codec={tpt.codec_key!r}\n' f' .laddr={tpt.laddr}\n'
f' stream={tpt.stream}\n' f' .raddr={tpt.raddr}\n'
f' maddr={tpt.maddr!r}\n' ) + (
f' drained={tpt.drained}\n' f' ._transport.stream={tpt.stream}\n'
f' ._transport.drained={tpt.drained}\n'
if privates else ''
) + ( ) + (
f' _send_lock={tpt._send_lock.statistics()}\n' f' _send_lock={tpt._send_lock.statistics()}\n'
if privates else '' if privates else ''
@ -257,6 +271,10 @@ class Channel:
def raddr(self) -> Address|None: def raddr(self) -> Address|None:
return self._transport.raddr if self._transport else None return self._transport.raddr if self._transport else None
@property
def maddr(self) -> str:
return self._transport.maddr if self._transport else '<no-tpt>'
# TODO: something like, # TODO: something like,
# `pdbp.hideframe_on(errors=[MsgTypeError])` # `pdbp.hideframe_on(errors=[MsgTypeError])`
# instead of the `try/except` hack we have rn.. # instead of the `try/except` hack we have rn..
@ -444,8 +462,8 @@ class Channel:
await self.send(aid) await self.send(aid)
peer_aid: Aid = await self.recv() peer_aid: Aid = await self.recv()
log.runtime( log.runtime(
f'Received hanshake with peer actor,\n' f'Received hanshake with peer\n'
f'{peer_aid}\n' f'<= {peer_aid.reprol(sin_uuid=False)}\n'
) )
# NOTE, we always are referencing the remote peer! # NOTE, we always are referencing the remote peer!
self.aid = peer_aid self.aid = peer_aid

View File

@ -17,9 +17,16 @@
Utils to tame mp non-SC madeness Utils to tame mp non-SC madeness
''' '''
# !TODO! in 3.13 this can be disabled (the-same/similarly) using
# a flag,
# - [ ] soo if it works like this, drop this module entirely for
# 3.13+ B)
# |_https://docs.python.org/3/library/multiprocessing.shared_memory.html
#
def disable_mantracker(): def disable_mantracker():
''' '''
Disable all ``multiprocessing``` "resource tracking" machinery since Disable all `multiprocessing` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness. it's an absolute multi-threaded mess of non-SC madness.
''' '''

View File

@ -26,7 +26,7 @@ from contextlib import (
from functools import partial from functools import partial
from itertools import chain from itertools import chain
import inspect import inspect
from pprint import pformat import textwrap
from types import ( from types import (
ModuleType, ModuleType,
) )
@ -43,7 +43,10 @@ from trio import (
SocketListener, SocketListener,
) )
# from ..devx import debug from ..devx.pformat import (
ppfmt,
nest_from_op,
)
from .._exceptions import ( from .._exceptions import (
TransportClosed, TransportClosed,
) )
@ -141,9 +144,8 @@ async def maybe_wait_on_canced_subs(
): ):
log.cancel( log.cancel(
'Waiting on cancel request to peer..\n' 'Waiting on cancel request to peer\n'
f'c)=>\n' f'c)=> {chan.aid.reprol()}@[{chan.maddr}]\n'
f' |_{chan.aid}\n'
) )
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
@ -179,7 +181,7 @@ async def maybe_wait_on_canced_subs(
log.warning( log.warning(
'Draining msg from disconnected peer\n' 'Draining msg from disconnected peer\n'
f'{chan_info}' f'{chan_info}'
f'{pformat(msg)}\n' f'{ppfmt(msg)}\n'
) )
# cid: str|None = msg.get('cid') # cid: str|None = msg.get('cid')
cid: str|None = msg.cid cid: str|None = msg.cid
@ -248,7 +250,7 @@ async def maybe_wait_on_canced_subs(
if children := local_nursery._children: if children := local_nursery._children:
# indent from above local-nurse repr # indent from above local-nurse repr
report += ( report += (
f' |_{pformat(children)}\n' f' |_{ppfmt(children)}\n'
) )
log.warning(report) log.warning(report)
@ -279,8 +281,9 @@ async def maybe_wait_on_canced_subs(
log.runtime( log.runtime(
f'Peer IPC broke but subproc is alive?\n\n' f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.aid}@{chan.raddr}\n' f'<=x {chan.aid.reprol()}@[{chan.maddr}]\n'
f' |_{proc}\n' f'\n'
f'{proc}\n'
) )
return local_nursery return local_nursery
@ -324,9 +327,10 @@ async def handle_stream_from_peer(
chan = Channel.from_stream(stream) chan = Channel.from_stream(stream)
con_status: str = ( con_status: str = (
'New inbound IPC connection <=\n' f'New inbound IPC transport connection\n'
f'|_{chan}\n' f'<=( {stream!r}\n'
) )
con_status_steps: str = ''
# initial handshake with peer phase # initial handshake with peer phase
try: try:
@ -372,7 +376,7 @@ async def handle_stream_from_peer(
if _pre_chan := server._peers.get(uid): if _pre_chan := server._peers.get(uid):
familiar: str = 'pre-existing-peer' familiar: str = 'pre-existing-peer'
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]' uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
con_status += ( con_status_steps += (
f' -> Handshake with {familiar} `{uid_short}` complete\n' f' -> Handshake with {familiar} `{uid_short}` complete\n'
) )
@ -397,7 +401,7 @@ async def handle_stream_from_peer(
None, None,
) )
if event: if event:
con_status += ( con_status_steps += (
' -> Waking subactor spawn waiters: ' ' -> Waking subactor spawn waiters: '
f'{event.statistics().tasks_waiting}\n' f'{event.statistics().tasks_waiting}\n'
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n' f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
@ -408,7 +412,7 @@ async def handle_stream_from_peer(
event.set() event.set()
else: else:
con_status += ( con_status_steps += (
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n' f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
) # type: ignore ) # type: ignore
@ -422,8 +426,15 @@ async def handle_stream_from_peer(
# TODO: can we just use list-ref directly? # TODO: can we just use list-ref directly?
chans.append(chan) chans.append(chan)
con_status += ' -> Entering RPC msg loop..\n' con_status_steps += ' -> Entering RPC msg loop..\n'
log.runtime(con_status) log.runtime(
con_status
+
textwrap.indent(
con_status_steps,
prefix=' '*3, # align to first-ln
)
)
# Begin channel management - respond to remote requests and # Begin channel management - respond to remote requests and
# process received reponses. # process received reponses.
@ -456,41 +467,67 @@ async def handle_stream_from_peer(
disconnected=disconnected, disconnected=disconnected,
) )
# ``Channel`` teardown and closure sequence # `Channel` teardown and closure sequence
# drop ref to channel so it can be gc-ed and disconnected # drop ref to channel so it can be gc-ed and disconnected
con_teardown_status: str = ( #
f'IPC channel disconnected:\n' # -[x]TODO mk this be like
f'<=x uid: {chan.aid}\n' # <=x Channel(
f' |_{pformat(chan)}\n\n' # |_field: blah
# )>
op_repr: str = '<=x '
chan_repr: str = nest_from_op(
input_op=op_repr,
op_suffix='',
nest_prefix='',
text=chan.pformat(),
nest_indent=len(op_repr)-1,
rm_from_first_ln='<',
) )
con_teardown_status: str = (
f'IPC channel disconnect\n'
f'\n'
f'{chan_repr}\n'
f'\n'
)
chans.remove(chan) chans.remove(chan)
# TODO: do we need to be this pedantic? # TODO: do we need to be this pedantic?
if not chans: if not chans:
con_teardown_status += ( con_teardown_status += (
f'-> No more channels with {chan.aid}' f'-> No more channels with {chan.aid.reprol()!r}\n'
) )
server._peers.pop(uid, None) server._peers.pop(uid, None)
peers_str: str = '' if peers := list(server._peers.values()):
for uid, chans in server._peers.items(): peer_cnt: int = len(peers)
peers_str += ( if (
f'uid: {uid}\n' (first := peers[0][0]) is not chan
) and
for i, chan in enumerate(chans): not disconnected
peers_str += ( and
f' |_[{i}] {pformat(chan)}\n' peer_cnt > 1
):
con_teardown_status += (
f'-> Remaining IPC {peer_cnt-1!r} peers:\n'
) )
for chans in server._peers.values():
con_teardown_status += ( first: Channel = chans[0]
f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n' if not (
) first is chan
and
disconnected
):
con_teardown_status += (
f' |_{first.aid.reprol()!r} -> {len(chans)!r} chans\n'
)
# No more channels to other actors (at all) registered # No more channels to other actors (at all) registered
# as connected. # as connected.
if not server._peers: if not server._peers:
con_teardown_status += ( con_teardown_status += (
'Signalling no more peer channel connections' '-> Signalling no more peer connections!\n'
) )
server._no_more_peers.set() server._no_more_peers.set()
@ -579,10 +616,10 @@ async def handle_stream_from_peer(
class Endpoint(Struct): class Endpoint(Struct):
''' '''
An instance of an IPC "bound" address where the lifetime of the An instance of an IPC "bound" address where the lifetime of an
"ability to accept connections" (from clients) and then handle "ability to accept connections" and handle the subsequent
those inbound sessions or sequences-of-packets is determined by sequence-of-packets (maybe oriented as sessions) is determined by
a (maybe pair of) nurser(y/ies). the underlying nursery scope(s).
''' '''
addr: Address addr: Address
@ -600,6 +637,24 @@ class Endpoint(Struct):
MsgTransport, # handle to encoded-msg transport stream MsgTransport, # handle to encoded-msg transport stream
] = {} ] = {}
def pformat(
self,
indent: int = 0,
privates: bool = False,
) -> str:
type_repr: str = type(self).__name__
fmtstr: str = (
# !TODO, always be ns aware!
# f'|_netns: {netns}\n'
f' |.addr: {self.addr!r}\n'
f' |_peers: {len(self.peer_tpts)}\n'
)
return (
f'<{type_repr}(\n'
f'{fmtstr}'
f')>'
)
async def start_listener(self) -> SocketListener: async def start_listener(self) -> SocketListener:
tpt_mod: ModuleType = inspect.getmodule(self.addr) tpt_mod: ModuleType = inspect.getmodule(self.addr)
lstnr: SocketListener = await tpt_mod.start_listener( lstnr: SocketListener = await tpt_mod.start_listener(
@ -639,11 +694,13 @@ class Endpoint(Struct):
class Server(Struct): class Server(Struct):
_parent_tn: Nursery _parent_tn: Nursery
_stream_handler_tn: Nursery _stream_handler_tn: Nursery
# level-triggered sig for whether "no peers are currently # level-triggered sig for whether "no peers are currently
# connected"; field is **always** set to an instance but # connected"; field is **always** set to an instance but
# initialized with `.is_set() == True`. # initialized with `.is_set() == True`.
_no_more_peers: trio.Event _no_more_peers: trio.Event
# active eps as allocated by `.listen_on()`
_endpoints: list[Endpoint] = [] _endpoints: list[Endpoint] = []
# connection tracking & mgmt # connection tracking & mgmt
@ -651,12 +708,19 @@ class Server(Struct):
str, # uaid str, # uaid
list[Channel], # IPC conns from peer list[Channel], # IPC conns from peer
] = defaultdict(list) ] = defaultdict(list)
# events-table with entries registered unset while the local
# actor is waiting on a new actor to inbound connect, often
# a parent waiting on its child just after spawn.
_peer_connected: dict[ _peer_connected: dict[
tuple[str, str], tuple[str, str],
trio.Event, trio.Event,
] = {} ] = {}
# syncs for setup/teardown sequences # syncs for setup/teardown sequences
# - null when not yet booted,
# - unset when active,
# - set when fully shutdown with 0 eps active.
_shutdown: trio.Event|None = None _shutdown: trio.Event|None = None
# TODO, maybe just make `._endpoints: list[Endpoint]` and # TODO, maybe just make `._endpoints: list[Endpoint]` and
@ -664,7 +728,6 @@ class Server(Struct):
# @property # @property
# def addrs2eps(self) -> dict[Address, Endpoint]: # def addrs2eps(self) -> dict[Address, Endpoint]:
# ... # ...
@property @property
def proto_keys(self) -> list[str]: def proto_keys(self) -> list[str]:
return [ return [
@ -690,7 +753,7 @@ class Server(Struct):
# TODO: obvi a different server type when we eventually # TODO: obvi a different server type when we eventually
# support some others XD # support some others XD
log.runtime( log.runtime(
f'Cancelling server(s) for\n' f'Cancelling server(s) for tpt-protos\n'
f'{self.proto_keys!r}\n' f'{self.proto_keys!r}\n'
) )
self._parent_tn.cancel_scope.cancel() self._parent_tn.cancel_scope.cancel()
@ -717,6 +780,14 @@ class Server(Struct):
f'protos: {tpt_protos!r}\n' f'protos: {tpt_protos!r}\n'
) )
def len_peers(
self,
) -> int:
return len([
chan.connected()
for chan in chain(*self._peers.values())
])
def has_peers( def has_peers(
self, self,
check_chans: bool = False, check_chans: bool = False,
@ -730,13 +801,11 @@ class Server(Struct):
has_peers has_peers
and and
check_chans check_chans
and
(peer_cnt := self.len_peers())
): ):
has_peers: bool = ( has_peers: bool = (
any(chan.connected() peer_cnt > 0
for chan in chain(
*self._peers.values()
)
)
and and
has_peers has_peers
) )
@ -803,30 +872,66 @@ class Server(Struct):
return ev.is_set() return ev.is_set()
def pformat(self) -> str: @property
def repr_state(self) -> str:
'''
A `str`-status describing the current state of this
IPC server in terms of the current operating "phase".
'''
status = 'server is active'
if self.has_peers():
peer_cnt: int = self.len_peers()
status: str = (
f'{peer_cnt!r} peer chans'
)
else:
status: str = 'No peer chans'
if self.is_shutdown():
status: str = 'server-shutdown'
return status
def pformat(
self,
privates: bool = False,
) -> str:
eps: list[Endpoint] = self._endpoints eps: list[Endpoint] = self._endpoints
state_repr: str = ( # state_repr: str = (
f'{len(eps)!r} IPC-endpoints active' # f'{len(eps)!r} endpoints active'
) # )
fmtstr = ( fmtstr = (
f' |_state: {state_repr}\n' f' |_state: {self.repr_state!r}\n'
f' no_more_peers: {self.has_peers()}\n'
) )
if self._shutdown is not None: if privates:
shutdown_stats: EventStatistics = self._shutdown.statistics() fmtstr += f' no_more_peers: {self.has_peers()}\n'
if self._shutdown is not None:
shutdown_stats: EventStatistics = self._shutdown.statistics()
fmtstr += (
f' task_waiting_on_shutdown: {shutdown_stats}\n'
)
if eps := self._endpoints:
addrs: list[tuple] = [
ep.addr for ep in eps
]
repr_eps: str = ppfmt(addrs)
fmtstr += ( fmtstr += (
f' task_waiting_on_shutdown: {shutdown_stats}\n' f' |_endpoints: {repr_eps}\n'
# ^TODO? how to indent closing ']'..
) )
fmtstr += ( if peers := self._peers:
# TODO, use the `ppfmt()` helper from `modden`! fmtstr += (
f' |_endpoints: {pformat(self._endpoints)}\n' f' |_peers: {len(peers)} connected\n'
f' |_peers: {len(self._peers)} connected\n' )
)
return ( return (
f'<IPCServer(\n' f'<Server(\n'
f'{fmtstr}' f'{fmtstr}'
f')>\n' f')>\n'
) )
@ -885,8 +990,8 @@ class Server(Struct):
) )
log.runtime( log.runtime(
f'Binding to endpoints for,\n' f'Binding endpoints\n'
f'{accept_addrs}\n' f'{ppfmt(accept_addrs)}\n'
) )
eps: list[Endpoint] = await self._parent_tn.start( eps: list[Endpoint] = await self._parent_tn.start(
partial( partial(
@ -896,13 +1001,19 @@ class Server(Struct):
listen_addrs=accept_addrs, listen_addrs=accept_addrs,
) )
) )
self._endpoints.extend(eps)
serv_repr: str = nest_from_op(
input_op='(>',
text=self.pformat(),
nest_indent=1,
)
log.runtime( log.runtime(
f'Started IPC endpoints\n' f'Started IPC server\n'
f'{eps}\n' f'{serv_repr}'
) )
self._endpoints.extend(eps) # XXX, a little sanity on new ep allocations
# XXX, just a little bit of sanity
group_tn: Nursery|None = None group_tn: Nursery|None = None
ep: Endpoint ep: Endpoint
for ep in eps: for ep in eps:
@ -956,9 +1067,13 @@ async def _serve_ipc_eps(
stream_handler_tn=stream_handler_tn, stream_handler_tn=stream_handler_tn,
) )
try: try:
ep_sclang: str = nest_from_op(
input_op='>[',
text=f'{ep.pformat()}',
)
log.runtime( log.runtime(
f'Starting new endpoint listener\n' f'Starting new endpoint listener\n'
f'{ep}\n' f'{ep_sclang}\n'
) )
listener: trio.abc.Listener = await ep.start_listener() listener: trio.abc.Listener = await ep.start_listener()
assert listener is ep._listener assert listener is ep._listener
@ -996,17 +1111,6 @@ async def _serve_ipc_eps(
handler_nursery=stream_handler_tn handler_nursery=stream_handler_tn
) )
) )
# TODO, wow make this message better! XD
log.runtime(
'Started server(s)\n'
+
'\n'.join([f'|_{addr}' for addr in listen_addrs])
)
log.runtime(
f'Started IPC endpoints\n'
f'{eps}\n'
)
task_status.started( task_status.started(
eps, eps,
) )
@ -1049,8 +1153,7 @@ async def open_ipc_server(
try: try:
yield ipc_server yield ipc_server
log.runtime( log.runtime(
f'Waiting on server to shutdown or be cancelled..\n' 'Server-tn running until terminated\n'
f'{ipc_server}'
) )
# TODO? when if ever would we want/need this? # TODO? when if ever would we want/need this?
# with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):

View File

@ -127,10 +127,9 @@ async def start_listener(
Start a TCP socket listener on the given `TCPAddress`. Start a TCP socket listener on the given `TCPAddress`.
''' '''
log.info( log.runtime(
f'Attempting to bind TCP socket\n' f'Trying socket bind\n'
f'>[\n' f'>[ {addr}\n'
f'|_{addr}\n'
) )
# ?TODO, maybe we should just change the lower-level call this is # ?TODO, maybe we should just change the lower-level call this is
# using internall per-listener? # using internall per-listener?
@ -145,11 +144,10 @@ async def start_listener(
assert len(listeners) == 1 assert len(listeners) == 1
listener = listeners[0] listener = listeners[0]
host, port = listener.socket.getsockname()[:2] host, port = listener.socket.getsockname()[:2]
bound_addr: TCPAddress = type(addr).from_addr((host, port))
log.info( log.info(
f'Listening on TCP socket\n' f'Listening on TCP socket\n'
f'[>\n' f'[> {bound_addr}\n'
f' |_{addr}\n'
) )
return listener return listener

View File

@ -81,10 +81,35 @@ BOLD_PALETTE = {
} }
def at_least_level(
log: Logger|LoggerAdapter,
level: int|str,
) -> bool:
'''
Predicate to test if a given level is active.
'''
if isinstance(level, str):
level: int = CUSTOM_LEVELS[level.upper()]
if log.getEffectiveLevel() <= level:
return True
return False
# TODO: this isn't showing the correct '{filename}' # TODO: this isn't showing the correct '{filename}'
# as it did before.. # as it did before..
class StackLevelAdapter(LoggerAdapter): class StackLevelAdapter(LoggerAdapter):
def at_least_level(
self,
level: str,
) -> bool:
return at_least_level(
log=self,
level=level,
)
def transport( def transport(
self, self,
msg: str, msg: str,
@ -401,19 +426,3 @@ def get_loglevel() -> str:
# global module logger for tractor itself # global module logger for tractor itself
log: StackLevelAdapter = get_logger('tractor') log: StackLevelAdapter = get_logger('tractor')
def at_least_level(
log: Logger|LoggerAdapter,
level: int|str,
) -> bool:
'''
Predicate to test if a given level is active.
'''
if isinstance(level, str):
level: int = CUSTOM_LEVELS[level.upper()]
if log.getEffectiveLevel() <= level:
return True
return False

View File

@ -177,6 +177,16 @@ class Aid(
f'{self.name}@{self.pid!r}' f'{self.name}@{self.pid!r}'
) )
# mk hashable via `.uuid`
def __hash__(self) -> int:
return hash(self.uuid)
def __eq__(self, other: Aid) -> bool:
return self.uuid == other.uuid
# use pretty fmt since often repr-ed for console/log
__repr__ = pretty_struct.Struct.__repr__
class SpawnSpec( class SpawnSpec(
pretty_struct.Struct, pretty_struct.Struct,

View File

@ -60,6 +60,9 @@ def find_masked_excs(
return None return None
# XXX, relevant ish discussion @ `trio`-core,
# https://github.com/python-trio/trio/issues/455#issuecomment-2785122216
#
@acm @acm
async def maybe_raise_from_masking_exc( async def maybe_raise_from_masking_exc(
tn: trio.Nursery|None = None, tn: trio.Nursery|None = None,