Compare commits
No commits in common. "c0058024c2cc9dabf160173a21d9cbb42b27e349" and "49c61e40c7abb90d0f619ab0b246b934071dea3d" have entirely different histories.
c0058024c2
...
49c61e40c7
|
@ -21,7 +21,7 @@ Sub-process entry points.
|
|||
from __future__ import annotations
|
||||
from functools import partial
|
||||
import multiprocessing as mp
|
||||
# import os
|
||||
import os
|
||||
from typing import (
|
||||
Any,
|
||||
TYPE_CHECKING,
|
||||
|
@ -38,7 +38,6 @@ from .devx import (
|
|||
_frame_stack,
|
||||
pformat,
|
||||
)
|
||||
# from .msg import pretty_struct
|
||||
from .to_asyncio import run_as_asyncio_guest
|
||||
from ._addr import UnwrappedAddress
|
||||
from ._runtime import (
|
||||
|
@ -128,13 +127,20 @@ def _trio_main(
|
|||
|
||||
if actor.loglevel is not None:
|
||||
get_console_log(actor.loglevel)
|
||||
actor_info: str = (
|
||||
f'|_{actor}\n'
|
||||
f' uid: {actor.uid}\n'
|
||||
f' pid: {os.getpid()}\n'
|
||||
f' parent_addr: {parent_addr}\n'
|
||||
f' loglevel: {actor.loglevel}\n'
|
||||
)
|
||||
log.info(
|
||||
f'Starting `trio` subactor from parent @ '
|
||||
f'{parent_addr}\n'
|
||||
'Starting new `trio` subactor\n'
|
||||
+
|
||||
pformat.nest_from_op(
|
||||
input_op='>(', # see syntax ideas above
|
||||
text=f'{actor}',
|
||||
text=actor_info,
|
||||
nest_indent=2, # since "complete"
|
||||
)
|
||||
)
|
||||
logmeth = log.info
|
||||
|
@ -143,7 +149,7 @@ def _trio_main(
|
|||
+
|
||||
pformat.nest_from_op(
|
||||
input_op=')>', # like a "closed-to-play"-icon from super perspective
|
||||
text=f'{actor}',
|
||||
text=actor_info,
|
||||
nest_indent=1,
|
||||
)
|
||||
)
|
||||
|
@ -161,7 +167,7 @@ def _trio_main(
|
|||
+
|
||||
pformat.nest_from_op(
|
||||
input_op='c)>', # closed due to cancel (see above)
|
||||
text=f'{actor}',
|
||||
text=actor_info,
|
||||
)
|
||||
)
|
||||
except BaseException as err:
|
||||
|
@ -171,7 +177,7 @@ def _trio_main(
|
|||
+
|
||||
pformat.nest_from_op(
|
||||
input_op='x)>', # closed by error
|
||||
text=f'{actor}',
|
||||
text=actor_info,
|
||||
)
|
||||
)
|
||||
# NOTE since we raise a tb will already be shown on the
|
||||
|
|
101
tractor/_rpc.py
101
tractor/_rpc.py
|
@ -64,7 +64,6 @@ from .trionics import (
|
|||
from .devx import (
|
||||
debug,
|
||||
add_div,
|
||||
pformat as _pformat,
|
||||
)
|
||||
from . import _state
|
||||
from .log import get_logger
|
||||
|
@ -73,7 +72,7 @@ from .msg import (
|
|||
MsgCodec,
|
||||
PayloadT,
|
||||
NamespacePath,
|
||||
pretty_struct,
|
||||
# pretty_struct,
|
||||
_ops as msgops,
|
||||
)
|
||||
from tractor.msg.types import (
|
||||
|
@ -221,18 +220,11 @@ async def _invoke_non_context(
|
|||
task_status.started(ctx)
|
||||
result = await coro
|
||||
fname: str = func.__name__
|
||||
|
||||
op_nested_task: str = _pformat.nest_from_op(
|
||||
input_op=f')> cid: {ctx.cid!r}',
|
||||
text=f'{ctx._task}',
|
||||
nest_indent=1, # under >
|
||||
)
|
||||
log.runtime(
|
||||
f'RPC task complete\n'
|
||||
f'\n'
|
||||
f'{op_nested_task}\n'
|
||||
f'\n'
|
||||
f')> {fname}() -> {pformat(result)}\n'
|
||||
'RPC complete:\n'
|
||||
f'task: {ctx._task}\n'
|
||||
f'|_cid={ctx.cid}\n'
|
||||
f'|_{fname}() -> {pformat(result)}\n'
|
||||
)
|
||||
|
||||
# NOTE: only send result if we know IPC isn't down
|
||||
|
@ -1051,7 +1043,7 @@ async def process_messages(
|
|||
):
|
||||
target_cid: str = kwargs['cid']
|
||||
kwargs |= {
|
||||
'requesting_aid': chan.aid,
|
||||
'requesting_uid': chan.uid,
|
||||
'ipc_msg': msg,
|
||||
|
||||
# XXX NOTE! ONLY the rpc-task-owning
|
||||
|
@ -1087,34 +1079,21 @@ async def process_messages(
|
|||
ns=ns,
|
||||
func=funcname,
|
||||
kwargs=kwargs, # type-spec this? see `msg.types`
|
||||
uid=actor_uuid,
|
||||
uid=actorid,
|
||||
):
|
||||
if actor_uuid != chan.aid.uid:
|
||||
raise RuntimeError(
|
||||
f'IPC <Start> msg <-> chan.aid mismatch!?\n'
|
||||
f'Channel.aid = {chan.aid!r}\n'
|
||||
f'Start.uid = {actor_uuid!r}\n'
|
||||
)
|
||||
# await debug.pause()
|
||||
op_repr: str = 'Start <=) '
|
||||
req_repr: str = _pformat.nest_from_op(
|
||||
input_op=op_repr,
|
||||
op_suffix='',
|
||||
nest_prefix='',
|
||||
text=f'{chan}',
|
||||
|
||||
nest_indent=len(op_repr)-1,
|
||||
rm_from_first_ln='<',
|
||||
# ^XXX, subtract -1 to account for
|
||||
# <Channel
|
||||
# ^_chevron to be stripped
|
||||
)
|
||||
start_status: str = (
|
||||
'Handling RPC request\n'
|
||||
f'{req_repr}\n'
|
||||
f'\n'
|
||||
f'->{{ ipc-context-id: {cid!r}\n'
|
||||
f'->{{ nsp for fn: `{ns}.{funcname}({kwargs})`\n'
|
||||
'Handling RPC `Start` request\n'
|
||||
f'<= peer: {actorid}\n\n'
|
||||
f' |_{chan}\n'
|
||||
f' |_cid: {cid}\n\n'
|
||||
# f' |_{ns}.{funcname}({kwargs})\n'
|
||||
f'>> {actor.uid}\n'
|
||||
f' |_{actor}\n'
|
||||
f' -> nsp: `{ns}.{funcname}({kwargs})`\n'
|
||||
|
||||
# f' |_{ns}.{funcname}({kwargs})\n\n'
|
||||
|
||||
# f'{pretty_struct.pformat(msg)}\n'
|
||||
)
|
||||
|
||||
# runtime-internal endpoint: `Actor.<funcname>`
|
||||
|
@ -1143,6 +1122,10 @@ async def process_messages(
|
|||
await chan.send(err_msg)
|
||||
continue
|
||||
|
||||
start_status += (
|
||||
f' -> func: {func}\n'
|
||||
)
|
||||
|
||||
# schedule a task for the requested RPC function
|
||||
# in the actor's main "service nursery".
|
||||
#
|
||||
|
@ -1150,7 +1133,7 @@ async def process_messages(
|
|||
# supervision isolation? would avoid having to
|
||||
# manage RPC tasks individually in `._rpc_tasks`
|
||||
# table?
|
||||
start_status += '->( scheduling new task..\n'
|
||||
start_status += ' -> scheduling new task..\n'
|
||||
log.runtime(start_status)
|
||||
try:
|
||||
ctx: Context = await actor._service_n.start(
|
||||
|
@ -1239,7 +1222,7 @@ async def process_messages(
|
|||
f'|_{chan}\n'
|
||||
)
|
||||
await actor.cancel_rpc_tasks(
|
||||
req_aid=actor.aid,
|
||||
req_uid=actor.uid,
|
||||
# a "self cancel" in terms of the lifetime of the
|
||||
# IPC connection which is presumed to be the
|
||||
# source of any requests for spawned tasks.
|
||||
|
@ -1311,37 +1294,13 @@ async def process_messages(
|
|||
finally:
|
||||
# msg debugging for when he machinery is brokey
|
||||
if msg is None:
|
||||
message: str = 'Exiting RPC-loop without receiving a msg?'
|
||||
message: str = 'Exiting IPC msg loop without receiving a msg?'
|
||||
else:
|
||||
task_op_repr: str = ')>'
|
||||
task: trio.Task = trio.lowlevel.current_task()
|
||||
|
||||
# maybe add cancelled opt prefix
|
||||
if task._cancel_status.effectively_cancelled:
|
||||
task_op_repr = 'c' + task_op_repr
|
||||
|
||||
task_repr: str = _pformat.nest_from_op(
|
||||
input_op=task_op_repr,
|
||||
text=f'{task!r}',
|
||||
nest_indent=1,
|
||||
)
|
||||
# chan_op_repr: str = '<=} '
|
||||
# chan_repr: str = _pformat.nest_from_op(
|
||||
# input_op=chan_op_repr,
|
||||
# op_suffix='',
|
||||
# nest_prefix='',
|
||||
# text=chan.pformat(),
|
||||
# nest_indent=len(chan_op_repr)-1,
|
||||
# rm_from_first_ln='<',
|
||||
# )
|
||||
message: str = (
|
||||
f'Exiting RPC-loop with final msg\n'
|
||||
f'\n'
|
||||
# f'{chan_repr}\n'
|
||||
f'{task_repr}\n'
|
||||
f'\n'
|
||||
f'{pretty_struct.pformat(msg)}'
|
||||
f'\n'
|
||||
'Exiting IPC msg loop with final msg\n\n'
|
||||
f'<= peer: {chan.uid}\n'
|
||||
f' |_{chan}\n\n'
|
||||
# f'{pretty_struct.pformat(msg)}'
|
||||
)
|
||||
|
||||
log.runtime(message)
|
||||
|
|
|
@ -262,7 +262,6 @@ def nest_from_op(
|
|||
nest_indent: int|None = None,
|
||||
# XXX indent `next_prefix` "to-the-right-of" `input_op`
|
||||
# by this count of whitespaces (' ').
|
||||
rm_from_first_ln: str|None = None,
|
||||
|
||||
) -> str:
|
||||
'''
|
||||
|
@ -347,35 +346,20 @@ def nest_from_op(
|
|||
if (
|
||||
nest_prefix
|
||||
and
|
||||
nest_indent != 0
|
||||
nest_indent
|
||||
):
|
||||
if nest_indent is not None:
|
||||
nest_prefix: str = textwrap.indent(
|
||||
nest_prefix,
|
||||
prefix=nest_indent*' ',
|
||||
)
|
||||
nest_indent: int = len(nest_prefix)
|
||||
|
||||
# determine body-text indent either by,
|
||||
# - using wtv explicit indent value is provided,
|
||||
# OR
|
||||
# - auto-calcing the indent to embed `text` under
|
||||
# the `nest_prefix` if provided, **IFF** `nest_indent=None`.
|
||||
tree_str_indent: int = 0
|
||||
if nest_indent not in {0, None}:
|
||||
tree_str_indent = nest_indent
|
||||
elif (
|
||||
nest_prefix
|
||||
and
|
||||
nest_indent != 0
|
||||
):
|
||||
tree_str_indent = len(nest_prefix)
|
||||
nest_prefix: str = textwrap.indent(
|
||||
nest_prefix,
|
||||
prefix=nest_indent*' ',
|
||||
)
|
||||
|
||||
indented_tree_str: str = text
|
||||
if tree_str_indent:
|
||||
tree_str_indent: int = 0
|
||||
if nest_indent != 0:
|
||||
tree_str_indent: int = len(nest_prefix)
|
||||
indented_tree_str: str = textwrap.indent(
|
||||
text,
|
||||
prefix=' '*tree_str_indent,
|
||||
prefix=' '*tree_str_indent
|
||||
)
|
||||
|
||||
# inject any provided nesting-prefix chars
|
||||
|
@ -385,35 +369,18 @@ def nest_from_op(
|
|||
f'{nest_prefix}{indented_tree_str[tree_str_indent:]}'
|
||||
)
|
||||
|
||||
if (
|
||||
not prefix_op
|
||||
or
|
||||
rm_from_first_ln
|
||||
):
|
||||
tree_lns: list[str] = indented_tree_str.splitlines()
|
||||
first: str = tree_lns[0]
|
||||
if rm_from_first_ln:
|
||||
first = first.strip().replace(
|
||||
rm_from_first_ln,
|
||||
'',
|
||||
)
|
||||
indented_tree_str: str = '\n'.join(tree_lns[1:])
|
||||
|
||||
if prefix_op:
|
||||
indented_tree_str = (
|
||||
f'{first}\n'
|
||||
f'{indented_tree_str}'
|
||||
)
|
||||
|
||||
if prefix_op:
|
||||
return (
|
||||
f'{input_op}{op_suffix}'
|
||||
f'{indented_tree_str}'
|
||||
)
|
||||
else:
|
||||
tree_lns: list[str] = indented_tree_str.splitlines()
|
||||
first: str = tree_lns[0]
|
||||
rest: str = '\n'.join(tree_lns[1:])
|
||||
return (
|
||||
f'{first}{input_op}{op_suffix}'
|
||||
f'{indented_tree_str}'
|
||||
f'{rest}'
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -171,23 +171,11 @@ class Channel:
|
|||
)
|
||||
assert transport.raddr == addr
|
||||
chan = Channel(transport=transport)
|
||||
|
||||
# ?TODO, compact this into adapter level-methods?
|
||||
# -[ ] would avoid extra repr-calcs if level not active?
|
||||
# |_ how would the `calc_if_level` look though? func?
|
||||
if log.at_least_level('runtime'):
|
||||
from tractor.devx import (
|
||||
pformat as _pformat,
|
||||
)
|
||||
chan_repr: str = _pformat.nest_from_op(
|
||||
input_op='[>',
|
||||
text=chan.pformat(),
|
||||
nest_indent=1,
|
||||
)
|
||||
log.runtime(
|
||||
f'Connected channel IPC transport\n'
|
||||
f'{chan_repr}'
|
||||
)
|
||||
log.runtime(
|
||||
f'Connected channel IPC transport\n'
|
||||
f'[>\n'
|
||||
f' |_{chan}\n'
|
||||
)
|
||||
return chan
|
||||
|
||||
@cm
|
||||
|
@ -230,19 +218,17 @@ class Channel:
|
|||
if privates else ''
|
||||
) + ( # peer-actor (processs) section
|
||||
f' |_peer: {self.aid.reprol()!r}\n'
|
||||
if self.aid else ' |_peer: <unknown>\n'
|
||||
if self.aid else '<unknown>'
|
||||
) + (
|
||||
f' |_msgstream: {tpt_name}\n'
|
||||
f' maddr: {tpt.maddr!r}\n'
|
||||
f' proto: {tpt.laddr.proto_key!r}\n'
|
||||
f' layer: {tpt.layer_key!r}\n'
|
||||
f' codec: {tpt.codec_key!r}\n'
|
||||
f' .laddr={tpt.laddr}\n'
|
||||
f' .raddr={tpt.raddr}\n'
|
||||
) + (
|
||||
f' ._transport.stream={tpt.stream}\n'
|
||||
f' ._transport.drained={tpt.drained}\n'
|
||||
if privates else ''
|
||||
f' proto={tpt.laddr.proto_key!r}\n'
|
||||
f' layer={tpt.layer_key!r}\n'
|
||||
f' laddr={tpt.laddr}\n'
|
||||
f' raddr={tpt.raddr}\n'
|
||||
f' codec={tpt.codec_key!r}\n'
|
||||
f' stream={tpt.stream}\n'
|
||||
f' maddr={tpt.maddr!r}\n'
|
||||
f' drained={tpt.drained}\n'
|
||||
) + (
|
||||
f' _send_lock={tpt._send_lock.statistics()}\n'
|
||||
if privates else ''
|
||||
|
@ -271,10 +257,6 @@ class Channel:
|
|||
def raddr(self) -> Address|None:
|
||||
return self._transport.raddr if self._transport else None
|
||||
|
||||
@property
|
||||
def maddr(self) -> str:
|
||||
return self._transport.maddr if self._transport else '<no-tpt>'
|
||||
|
||||
# TODO: something like,
|
||||
# `pdbp.hideframe_on(errors=[MsgTypeError])`
|
||||
# instead of the `try/except` hack we have rn..
|
||||
|
@ -462,8 +444,8 @@ class Channel:
|
|||
await self.send(aid)
|
||||
peer_aid: Aid = await self.recv()
|
||||
log.runtime(
|
||||
f'Received hanshake with peer\n'
|
||||
f'<= {peer_aid.reprol(sin_uuid=False)}\n'
|
||||
f'Received hanshake with peer actor,\n'
|
||||
f'{peer_aid}\n'
|
||||
)
|
||||
# NOTE, we always are referencing the remote peer!
|
||||
self.aid = peer_aid
|
||||
|
|
|
@ -17,16 +17,9 @@
|
|||
Utils to tame mp non-SC madeness
|
||||
|
||||
'''
|
||||
|
||||
# !TODO! in 3.13 this can be disabled (the-same/similarly) using
|
||||
# a flag,
|
||||
# - [ ] soo if it works like this, drop this module entirely for
|
||||
# 3.13+ B)
|
||||
# |_https://docs.python.org/3/library/multiprocessing.shared_memory.html
|
||||
#
|
||||
def disable_mantracker():
|
||||
'''
|
||||
Disable all `multiprocessing` "resource tracking" machinery since
|
||||
Disable all ``multiprocessing``` "resource tracking" machinery since
|
||||
it's an absolute multi-threaded mess of non-SC madness.
|
||||
|
||||
'''
|
||||
|
|
|
@ -26,7 +26,7 @@ from contextlib import (
|
|||
from functools import partial
|
||||
from itertools import chain
|
||||
import inspect
|
||||
import textwrap
|
||||
from pprint import pformat
|
||||
from types import (
|
||||
ModuleType,
|
||||
)
|
||||
|
@ -43,10 +43,7 @@ from trio import (
|
|||
SocketListener,
|
||||
)
|
||||
|
||||
from ..devx.pformat import (
|
||||
ppfmt,
|
||||
nest_from_op,
|
||||
)
|
||||
# from ..devx import debug
|
||||
from .._exceptions import (
|
||||
TransportClosed,
|
||||
)
|
||||
|
@ -144,8 +141,9 @@ async def maybe_wait_on_canced_subs(
|
|||
|
||||
):
|
||||
log.cancel(
|
||||
'Waiting on cancel request to peer\n'
|
||||
f'c)=> {chan.aid.reprol()}@[{chan.maddr}]\n'
|
||||
'Waiting on cancel request to peer..\n'
|
||||
f'c)=>\n'
|
||||
f' |_{chan.aid}\n'
|
||||
)
|
||||
|
||||
# XXX: this is a soft wait on the channel (and its
|
||||
|
@ -181,7 +179,7 @@ async def maybe_wait_on_canced_subs(
|
|||
log.warning(
|
||||
'Draining msg from disconnected peer\n'
|
||||
f'{chan_info}'
|
||||
f'{ppfmt(msg)}\n'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
# cid: str|None = msg.get('cid')
|
||||
cid: str|None = msg.cid
|
||||
|
@ -250,7 +248,7 @@ async def maybe_wait_on_canced_subs(
|
|||
if children := local_nursery._children:
|
||||
# indent from above local-nurse repr
|
||||
report += (
|
||||
f' |_{ppfmt(children)}\n'
|
||||
f' |_{pformat(children)}\n'
|
||||
)
|
||||
|
||||
log.warning(report)
|
||||
|
@ -281,9 +279,8 @@ async def maybe_wait_on_canced_subs(
|
|||
log.runtime(
|
||||
f'Peer IPC broke but subproc is alive?\n\n'
|
||||
|
||||
f'<=x {chan.aid.reprol()}@[{chan.maddr}]\n'
|
||||
f'\n'
|
||||
f'{proc}\n'
|
||||
f'<=x {chan.aid}@{chan.raddr}\n'
|
||||
f' |_{proc}\n'
|
||||
)
|
||||
|
||||
return local_nursery
|
||||
|
@ -327,10 +324,9 @@ async def handle_stream_from_peer(
|
|||
|
||||
chan = Channel.from_stream(stream)
|
||||
con_status: str = (
|
||||
f'New inbound IPC transport connection\n'
|
||||
f'<=( {stream!r}\n'
|
||||
'New inbound IPC connection <=\n'
|
||||
f'|_{chan}\n'
|
||||
)
|
||||
con_status_steps: str = ''
|
||||
|
||||
# initial handshake with peer phase
|
||||
try:
|
||||
|
@ -376,7 +372,7 @@ async def handle_stream_from_peer(
|
|||
if _pre_chan := server._peers.get(uid):
|
||||
familiar: str = 'pre-existing-peer'
|
||||
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
|
||||
con_status_steps += (
|
||||
con_status += (
|
||||
f' -> Handshake with {familiar} `{uid_short}` complete\n'
|
||||
)
|
||||
|
||||
|
@ -401,7 +397,7 @@ async def handle_stream_from_peer(
|
|||
None,
|
||||
)
|
||||
if event:
|
||||
con_status_steps += (
|
||||
con_status += (
|
||||
' -> Waking subactor spawn waiters: '
|
||||
f'{event.statistics().tasks_waiting}\n'
|
||||
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
|
||||
|
@ -412,7 +408,7 @@ async def handle_stream_from_peer(
|
|||
event.set()
|
||||
|
||||
else:
|
||||
con_status_steps += (
|
||||
con_status += (
|
||||
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
|
||||
) # type: ignore
|
||||
|
||||
|
@ -426,15 +422,8 @@ async def handle_stream_from_peer(
|
|||
# TODO: can we just use list-ref directly?
|
||||
chans.append(chan)
|
||||
|
||||
con_status_steps += ' -> Entering RPC msg loop..\n'
|
||||
log.runtime(
|
||||
con_status
|
||||
+
|
||||
textwrap.indent(
|
||||
con_status_steps,
|
||||
prefix=' '*3, # align to first-ln
|
||||
)
|
||||
)
|
||||
con_status += ' -> Entering RPC msg loop..\n'
|
||||
log.runtime(con_status)
|
||||
|
||||
# Begin channel management - respond to remote requests and
|
||||
# process received reponses.
|
||||
|
@ -467,67 +456,41 @@ async def handle_stream_from_peer(
|
|||
disconnected=disconnected,
|
||||
)
|
||||
|
||||
# `Channel` teardown and closure sequence
|
||||
# ``Channel`` teardown and closure sequence
|
||||
# drop ref to channel so it can be gc-ed and disconnected
|
||||
#
|
||||
# -[x]TODO mk this be like
|
||||
# <=x Channel(
|
||||
# |_field: blah
|
||||
# )>
|
||||
op_repr: str = '<=x '
|
||||
chan_repr: str = nest_from_op(
|
||||
input_op=op_repr,
|
||||
op_suffix='',
|
||||
nest_prefix='',
|
||||
text=chan.pformat(),
|
||||
nest_indent=len(op_repr)-1,
|
||||
rm_from_first_ln='<',
|
||||
)
|
||||
|
||||
con_teardown_status: str = (
|
||||
f'IPC channel disconnect\n'
|
||||
f'\n'
|
||||
f'{chan_repr}\n'
|
||||
f'\n'
|
||||
f'IPC channel disconnected:\n'
|
||||
f'<=x uid: {chan.aid}\n'
|
||||
f' |_{pformat(chan)}\n\n'
|
||||
)
|
||||
|
||||
chans.remove(chan)
|
||||
|
||||
# TODO: do we need to be this pedantic?
|
||||
if not chans:
|
||||
con_teardown_status += (
|
||||
f'-> No more channels with {chan.aid.reprol()!r}\n'
|
||||
f'-> No more channels with {chan.aid}'
|
||||
)
|
||||
server._peers.pop(uid, None)
|
||||
|
||||
if peers := list(server._peers.values()):
|
||||
peer_cnt: int = len(peers)
|
||||
if (
|
||||
(first := peers[0][0]) is not chan
|
||||
and
|
||||
not disconnected
|
||||
and
|
||||
peer_cnt > 1
|
||||
):
|
||||
con_teardown_status += (
|
||||
f'-> Remaining IPC {peer_cnt-1!r} peers:\n'
|
||||
peers_str: str = ''
|
||||
for uid, chans in server._peers.items():
|
||||
peers_str += (
|
||||
f'uid: {uid}\n'
|
||||
)
|
||||
for i, chan in enumerate(chans):
|
||||
peers_str += (
|
||||
f' |_[{i}] {pformat(chan)}\n'
|
||||
)
|
||||
for chans in server._peers.values():
|
||||
first: Channel = chans[0]
|
||||
if not (
|
||||
first is chan
|
||||
and
|
||||
disconnected
|
||||
):
|
||||
con_teardown_status += (
|
||||
f' |_{first.aid.reprol()!r} -> {len(chans)!r} chans\n'
|
||||
)
|
||||
|
||||
con_teardown_status += (
|
||||
f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n'
|
||||
)
|
||||
|
||||
# No more channels to other actors (at all) registered
|
||||
# as connected.
|
||||
if not server._peers:
|
||||
con_teardown_status += (
|
||||
'-> Signalling no more peer connections!\n'
|
||||
'Signalling no more peer channel connections'
|
||||
)
|
||||
server._no_more_peers.set()
|
||||
|
||||
|
@ -616,10 +579,10 @@ async def handle_stream_from_peer(
|
|||
|
||||
class Endpoint(Struct):
|
||||
'''
|
||||
An instance of an IPC "bound" address where the lifetime of an
|
||||
"ability to accept connections" and handle the subsequent
|
||||
sequence-of-packets (maybe oriented as sessions) is determined by
|
||||
the underlying nursery scope(s).
|
||||
An instance of an IPC "bound" address where the lifetime of the
|
||||
"ability to accept connections" (from clients) and then handle
|
||||
those inbound sessions or sequences-of-packets is determined by
|
||||
a (maybe pair of) nurser(y/ies).
|
||||
|
||||
'''
|
||||
addr: Address
|
||||
|
@ -637,24 +600,6 @@ class Endpoint(Struct):
|
|||
MsgTransport, # handle to encoded-msg transport stream
|
||||
] = {}
|
||||
|
||||
def pformat(
|
||||
self,
|
||||
indent: int = 0,
|
||||
privates: bool = False,
|
||||
) -> str:
|
||||
type_repr: str = type(self).__name__
|
||||
fmtstr: str = (
|
||||
# !TODO, always be ns aware!
|
||||
# f'|_netns: {netns}\n'
|
||||
f' |.addr: {self.addr!r}\n'
|
||||
f' |_peers: {len(self.peer_tpts)}\n'
|
||||
)
|
||||
return (
|
||||
f'<{type_repr}(\n'
|
||||
f'{fmtstr}'
|
||||
f')>'
|
||||
)
|
||||
|
||||
async def start_listener(self) -> SocketListener:
|
||||
tpt_mod: ModuleType = inspect.getmodule(self.addr)
|
||||
lstnr: SocketListener = await tpt_mod.start_listener(
|
||||
|
@ -694,13 +639,11 @@ class Endpoint(Struct):
|
|||
class Server(Struct):
|
||||
_parent_tn: Nursery
|
||||
_stream_handler_tn: Nursery
|
||||
|
||||
# level-triggered sig for whether "no peers are currently
|
||||
# connected"; field is **always** set to an instance but
|
||||
# initialized with `.is_set() == True`.
|
||||
_no_more_peers: trio.Event
|
||||
|
||||
# active eps as allocated by `.listen_on()`
|
||||
_endpoints: list[Endpoint] = []
|
||||
|
||||
# connection tracking & mgmt
|
||||
|
@ -708,19 +651,12 @@ class Server(Struct):
|
|||
str, # uaid
|
||||
list[Channel], # IPC conns from peer
|
||||
] = defaultdict(list)
|
||||
|
||||
# events-table with entries registered unset while the local
|
||||
# actor is waiting on a new actor to inbound connect, often
|
||||
# a parent waiting on its child just after spawn.
|
||||
_peer_connected: dict[
|
||||
tuple[str, str],
|
||||
trio.Event,
|
||||
] = {}
|
||||
|
||||
# syncs for setup/teardown sequences
|
||||
# - null when not yet booted,
|
||||
# - unset when active,
|
||||
# - set when fully shutdown with 0 eps active.
|
||||
_shutdown: trio.Event|None = None
|
||||
|
||||
# TODO, maybe just make `._endpoints: list[Endpoint]` and
|
||||
|
@ -728,6 +664,7 @@ class Server(Struct):
|
|||
# @property
|
||||
# def addrs2eps(self) -> dict[Address, Endpoint]:
|
||||
# ...
|
||||
|
||||
@property
|
||||
def proto_keys(self) -> list[str]:
|
||||
return [
|
||||
|
@ -753,7 +690,7 @@ class Server(Struct):
|
|||
# TODO: obvi a different server type when we eventually
|
||||
# support some others XD
|
||||
log.runtime(
|
||||
f'Cancelling server(s) for tpt-protos\n'
|
||||
f'Cancelling server(s) for\n'
|
||||
f'{self.proto_keys!r}\n'
|
||||
)
|
||||
self._parent_tn.cancel_scope.cancel()
|
||||
|
@ -780,14 +717,6 @@ class Server(Struct):
|
|||
f'protos: {tpt_protos!r}\n'
|
||||
)
|
||||
|
||||
def len_peers(
|
||||
self,
|
||||
) -> int:
|
||||
return len([
|
||||
chan.connected()
|
||||
for chan in chain(*self._peers.values())
|
||||
])
|
||||
|
||||
def has_peers(
|
||||
self,
|
||||
check_chans: bool = False,
|
||||
|
@ -801,11 +730,13 @@ class Server(Struct):
|
|||
has_peers
|
||||
and
|
||||
check_chans
|
||||
and
|
||||
(peer_cnt := self.len_peers())
|
||||
):
|
||||
has_peers: bool = (
|
||||
peer_cnt > 0
|
||||
any(chan.connected()
|
||||
for chan in chain(
|
||||
*self._peers.values()
|
||||
)
|
||||
)
|
||||
and
|
||||
has_peers
|
||||
)
|
||||
|
@ -872,66 +803,30 @@ class Server(Struct):
|
|||
|
||||
return ev.is_set()
|
||||
|
||||
@property
|
||||
def repr_state(self) -> str:
|
||||
'''
|
||||
A `str`-status describing the current state of this
|
||||
IPC server in terms of the current operating "phase".
|
||||
|
||||
'''
|
||||
status = 'server is active'
|
||||
if self.has_peers():
|
||||
peer_cnt: int = self.len_peers()
|
||||
status: str = (
|
||||
f'{peer_cnt!r} peer chans'
|
||||
)
|
||||
else:
|
||||
status: str = 'No peer chans'
|
||||
|
||||
if self.is_shutdown():
|
||||
status: str = 'server-shutdown'
|
||||
|
||||
return status
|
||||
|
||||
def pformat(
|
||||
self,
|
||||
privates: bool = False,
|
||||
) -> str:
|
||||
def pformat(self) -> str:
|
||||
eps: list[Endpoint] = self._endpoints
|
||||
|
||||
# state_repr: str = (
|
||||
# f'{len(eps)!r} endpoints active'
|
||||
# )
|
||||
fmtstr = (
|
||||
f' |_state: {self.repr_state!r}\n'
|
||||
state_repr: str = (
|
||||
f'{len(eps)!r} IPC-endpoints active'
|
||||
)
|
||||
if privates:
|
||||
fmtstr += f' no_more_peers: {self.has_peers()}\n'
|
||||
|
||||
if self._shutdown is not None:
|
||||
shutdown_stats: EventStatistics = self._shutdown.statistics()
|
||||
fmtstr += (
|
||||
f' task_waiting_on_shutdown: {shutdown_stats}\n'
|
||||
)
|
||||
|
||||
if eps := self._endpoints:
|
||||
addrs: list[tuple] = [
|
||||
ep.addr for ep in eps
|
||||
]
|
||||
repr_eps: str = ppfmt(addrs)
|
||||
|
||||
fmtstr = (
|
||||
f' |_state: {state_repr}\n'
|
||||
f' no_more_peers: {self.has_peers()}\n'
|
||||
)
|
||||
if self._shutdown is not None:
|
||||
shutdown_stats: EventStatistics = self._shutdown.statistics()
|
||||
fmtstr += (
|
||||
f' |_endpoints: {repr_eps}\n'
|
||||
# ^TODO? how to indent closing ']'..
|
||||
f' task_waiting_on_shutdown: {shutdown_stats}\n'
|
||||
)
|
||||
|
||||
if peers := self._peers:
|
||||
fmtstr += (
|
||||
f' |_peers: {len(peers)} connected\n'
|
||||
)
|
||||
fmtstr += (
|
||||
# TODO, use the `ppfmt()` helper from `modden`!
|
||||
f' |_endpoints: {pformat(self._endpoints)}\n'
|
||||
f' |_peers: {len(self._peers)} connected\n'
|
||||
)
|
||||
|
||||
return (
|
||||
f'<Server(\n'
|
||||
f'<IPCServer(\n'
|
||||
f'{fmtstr}'
|
||||
f')>\n'
|
||||
)
|
||||
|
@ -990,8 +885,8 @@ class Server(Struct):
|
|||
)
|
||||
|
||||
log.runtime(
|
||||
f'Binding endpoints\n'
|
||||
f'{ppfmt(accept_addrs)}\n'
|
||||
f'Binding to endpoints for,\n'
|
||||
f'{accept_addrs}\n'
|
||||
)
|
||||
eps: list[Endpoint] = await self._parent_tn.start(
|
||||
partial(
|
||||
|
@ -1001,19 +896,13 @@ class Server(Struct):
|
|||
listen_addrs=accept_addrs,
|
||||
)
|
||||
)
|
||||
self._endpoints.extend(eps)
|
||||
|
||||
serv_repr: str = nest_from_op(
|
||||
input_op='(>',
|
||||
text=self.pformat(),
|
||||
nest_indent=1,
|
||||
)
|
||||
log.runtime(
|
||||
f'Started IPC server\n'
|
||||
f'{serv_repr}'
|
||||
f'Started IPC endpoints\n'
|
||||
f'{eps}\n'
|
||||
)
|
||||
|
||||
# XXX, a little sanity on new ep allocations
|
||||
self._endpoints.extend(eps)
|
||||
# XXX, just a little bit of sanity
|
||||
group_tn: Nursery|None = None
|
||||
ep: Endpoint
|
||||
for ep in eps:
|
||||
|
@ -1067,13 +956,9 @@ async def _serve_ipc_eps(
|
|||
stream_handler_tn=stream_handler_tn,
|
||||
)
|
||||
try:
|
||||
ep_sclang: str = nest_from_op(
|
||||
input_op='>[',
|
||||
text=f'{ep.pformat()}',
|
||||
)
|
||||
log.runtime(
|
||||
f'Starting new endpoint listener\n'
|
||||
f'{ep_sclang}\n'
|
||||
f'{ep}\n'
|
||||
)
|
||||
listener: trio.abc.Listener = await ep.start_listener()
|
||||
assert listener is ep._listener
|
||||
|
@ -1111,6 +996,17 @@ async def _serve_ipc_eps(
|
|||
handler_nursery=stream_handler_tn
|
||||
)
|
||||
)
|
||||
# TODO, wow make this message better! XD
|
||||
log.runtime(
|
||||
'Started server(s)\n'
|
||||
+
|
||||
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
||||
)
|
||||
|
||||
log.runtime(
|
||||
f'Started IPC endpoints\n'
|
||||
f'{eps}\n'
|
||||
)
|
||||
task_status.started(
|
||||
eps,
|
||||
)
|
||||
|
@ -1153,7 +1049,8 @@ async def open_ipc_server(
|
|||
try:
|
||||
yield ipc_server
|
||||
log.runtime(
|
||||
'Server-tn running until terminated\n'
|
||||
f'Waiting on server to shutdown or be cancelled..\n'
|
||||
f'{ipc_server}'
|
||||
)
|
||||
# TODO? when if ever would we want/need this?
|
||||
# with trio.CancelScope(shield=True):
|
||||
|
|
|
@ -127,9 +127,10 @@ async def start_listener(
|
|||
Start a TCP socket listener on the given `TCPAddress`.
|
||||
|
||||
'''
|
||||
log.runtime(
|
||||
f'Trying socket bind\n'
|
||||
f'>[ {addr}\n'
|
||||
log.info(
|
||||
f'Attempting to bind TCP socket\n'
|
||||
f'>[\n'
|
||||
f'|_{addr}\n'
|
||||
)
|
||||
# ?TODO, maybe we should just change the lower-level call this is
|
||||
# using internall per-listener?
|
||||
|
@ -144,10 +145,11 @@ async def start_listener(
|
|||
assert len(listeners) == 1
|
||||
listener = listeners[0]
|
||||
host, port = listener.socket.getsockname()[:2]
|
||||
bound_addr: TCPAddress = type(addr).from_addr((host, port))
|
||||
|
||||
log.info(
|
||||
f'Listening on TCP socket\n'
|
||||
f'[> {bound_addr}\n'
|
||||
f'[>\n'
|
||||
f' |_{addr}\n'
|
||||
)
|
||||
return listener
|
||||
|
||||
|
|
|
@ -81,35 +81,10 @@ BOLD_PALETTE = {
|
|||
}
|
||||
|
||||
|
||||
def at_least_level(
|
||||
log: Logger|LoggerAdapter,
|
||||
level: int|str,
|
||||
) -> bool:
|
||||
'''
|
||||
Predicate to test if a given level is active.
|
||||
|
||||
'''
|
||||
if isinstance(level, str):
|
||||
level: int = CUSTOM_LEVELS[level.upper()]
|
||||
|
||||
if log.getEffectiveLevel() <= level:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# TODO: this isn't showing the correct '{filename}'
|
||||
# as it did before..
|
||||
class StackLevelAdapter(LoggerAdapter):
|
||||
|
||||
def at_least_level(
|
||||
self,
|
||||
level: str,
|
||||
) -> bool:
|
||||
return at_least_level(
|
||||
log=self,
|
||||
level=level,
|
||||
)
|
||||
|
||||
def transport(
|
||||
self,
|
||||
msg: str,
|
||||
|
@ -426,3 +401,19 @@ def get_loglevel() -> str:
|
|||
|
||||
# global module logger for tractor itself
|
||||
log: StackLevelAdapter = get_logger('tractor')
|
||||
|
||||
|
||||
def at_least_level(
|
||||
log: Logger|LoggerAdapter,
|
||||
level: int|str,
|
||||
) -> bool:
|
||||
'''
|
||||
Predicate to test if a given level is active.
|
||||
|
||||
'''
|
||||
if isinstance(level, str):
|
||||
level: int = CUSTOM_LEVELS[level.upper()]
|
||||
|
||||
if log.getEffectiveLevel() <= level:
|
||||
return True
|
||||
return False
|
||||
|
|
|
@ -177,16 +177,6 @@ class Aid(
|
|||
f'{self.name}@{self.pid!r}'
|
||||
)
|
||||
|
||||
# mk hashable via `.uuid`
|
||||
def __hash__(self) -> int:
|
||||
return hash(self.uuid)
|
||||
|
||||
def __eq__(self, other: Aid) -> bool:
|
||||
return self.uuid == other.uuid
|
||||
|
||||
# use pretty fmt since often repr-ed for console/log
|
||||
__repr__ = pretty_struct.Struct.__repr__
|
||||
|
||||
|
||||
class SpawnSpec(
|
||||
pretty_struct.Struct,
|
||||
|
|
|
@ -60,9 +60,6 @@ def find_masked_excs(
|
|||
return None
|
||||
|
||||
|
||||
# XXX, relevant ish discussion @ `trio`-core,
|
||||
# https://github.com/python-trio/trio/issues/455#issuecomment-2785122216
|
||||
#
|
||||
@acm
|
||||
async def maybe_raise_from_masking_exc(
|
||||
tn: trio.Nursery|None = None,
|
||||
|
|
Loading…
Reference in New Issue