Compare commits
10 Commits
49c61e40c7
...
c0058024c2
Author | SHA1 | Date |
---|---|---|
|
c0058024c2 | |
|
065104401c | |
|
3201437f4e | |
|
a9da16892d | |
|
1b609113c3 | |
|
4a80cda841 | |
|
131e2ee0a4 | |
|
79ef973058 | |
|
c738492879 | |
|
a931274da6 |
|
@ -21,7 +21,7 @@ Sub-process entry points.
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
import os
|
# import os
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
@ -38,6 +38,7 @@ from .devx import (
|
||||||
_frame_stack,
|
_frame_stack,
|
||||||
pformat,
|
pformat,
|
||||||
)
|
)
|
||||||
|
# from .msg import pretty_struct
|
||||||
from .to_asyncio import run_as_asyncio_guest
|
from .to_asyncio import run_as_asyncio_guest
|
||||||
from ._addr import UnwrappedAddress
|
from ._addr import UnwrappedAddress
|
||||||
from ._runtime import (
|
from ._runtime import (
|
||||||
|
@ -127,20 +128,13 @@ def _trio_main(
|
||||||
|
|
||||||
if actor.loglevel is not None:
|
if actor.loglevel is not None:
|
||||||
get_console_log(actor.loglevel)
|
get_console_log(actor.loglevel)
|
||||||
actor_info: str = (
|
|
||||||
f'|_{actor}\n'
|
|
||||||
f' uid: {actor.uid}\n'
|
|
||||||
f' pid: {os.getpid()}\n'
|
|
||||||
f' parent_addr: {parent_addr}\n'
|
|
||||||
f' loglevel: {actor.loglevel}\n'
|
|
||||||
)
|
|
||||||
log.info(
|
log.info(
|
||||||
'Starting new `trio` subactor\n'
|
f'Starting `trio` subactor from parent @ '
|
||||||
|
f'{parent_addr}\n'
|
||||||
+
|
+
|
||||||
pformat.nest_from_op(
|
pformat.nest_from_op(
|
||||||
input_op='>(', # see syntax ideas above
|
input_op='>(', # see syntax ideas above
|
||||||
text=actor_info,
|
text=f'{actor}',
|
||||||
nest_indent=2, # since "complete"
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
logmeth = log.info
|
logmeth = log.info
|
||||||
|
@ -149,7 +143,7 @@ def _trio_main(
|
||||||
+
|
+
|
||||||
pformat.nest_from_op(
|
pformat.nest_from_op(
|
||||||
input_op=')>', # like a "closed-to-play"-icon from super perspective
|
input_op=')>', # like a "closed-to-play"-icon from super perspective
|
||||||
text=actor_info,
|
text=f'{actor}',
|
||||||
nest_indent=1,
|
nest_indent=1,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -167,7 +161,7 @@ def _trio_main(
|
||||||
+
|
+
|
||||||
pformat.nest_from_op(
|
pformat.nest_from_op(
|
||||||
input_op='c)>', # closed due to cancel (see above)
|
input_op='c)>', # closed due to cancel (see above)
|
||||||
text=actor_info,
|
text=f'{actor}',
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
|
@ -177,7 +171,7 @@ def _trio_main(
|
||||||
+
|
+
|
||||||
pformat.nest_from_op(
|
pformat.nest_from_op(
|
||||||
input_op='x)>', # closed by error
|
input_op='x)>', # closed by error
|
||||||
text=actor_info,
|
text=f'{actor}',
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# NOTE since we raise a tb will already be shown on the
|
# NOTE since we raise a tb will already be shown on the
|
||||||
|
|
101
tractor/_rpc.py
101
tractor/_rpc.py
|
@ -64,6 +64,7 @@ from .trionics import (
|
||||||
from .devx import (
|
from .devx import (
|
||||||
debug,
|
debug,
|
||||||
add_div,
|
add_div,
|
||||||
|
pformat as _pformat,
|
||||||
)
|
)
|
||||||
from . import _state
|
from . import _state
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
|
@ -72,7 +73,7 @@ from .msg import (
|
||||||
MsgCodec,
|
MsgCodec,
|
||||||
PayloadT,
|
PayloadT,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
# pretty_struct,
|
pretty_struct,
|
||||||
_ops as msgops,
|
_ops as msgops,
|
||||||
)
|
)
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
|
@ -220,11 +221,18 @@ async def _invoke_non_context(
|
||||||
task_status.started(ctx)
|
task_status.started(ctx)
|
||||||
result = await coro
|
result = await coro
|
||||||
fname: str = func.__name__
|
fname: str = func.__name__
|
||||||
|
|
||||||
|
op_nested_task: str = _pformat.nest_from_op(
|
||||||
|
input_op=f')> cid: {ctx.cid!r}',
|
||||||
|
text=f'{ctx._task}',
|
||||||
|
nest_indent=1, # under >
|
||||||
|
)
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'RPC complete:\n'
|
f'RPC task complete\n'
|
||||||
f'task: {ctx._task}\n'
|
f'\n'
|
||||||
f'|_cid={ctx.cid}\n'
|
f'{op_nested_task}\n'
|
||||||
f'|_{fname}() -> {pformat(result)}\n'
|
f'\n'
|
||||||
|
f')> {fname}() -> {pformat(result)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# NOTE: only send result if we know IPC isn't down
|
# NOTE: only send result if we know IPC isn't down
|
||||||
|
@ -1043,7 +1051,7 @@ async def process_messages(
|
||||||
):
|
):
|
||||||
target_cid: str = kwargs['cid']
|
target_cid: str = kwargs['cid']
|
||||||
kwargs |= {
|
kwargs |= {
|
||||||
'requesting_uid': chan.uid,
|
'requesting_aid': chan.aid,
|
||||||
'ipc_msg': msg,
|
'ipc_msg': msg,
|
||||||
|
|
||||||
# XXX NOTE! ONLY the rpc-task-owning
|
# XXX NOTE! ONLY the rpc-task-owning
|
||||||
|
@ -1079,21 +1087,34 @@ async def process_messages(
|
||||||
ns=ns,
|
ns=ns,
|
||||||
func=funcname,
|
func=funcname,
|
||||||
kwargs=kwargs, # type-spec this? see `msg.types`
|
kwargs=kwargs, # type-spec this? see `msg.types`
|
||||||
uid=actorid,
|
uid=actor_uuid,
|
||||||
):
|
):
|
||||||
|
if actor_uuid != chan.aid.uid:
|
||||||
|
raise RuntimeError(
|
||||||
|
f'IPC <Start> msg <-> chan.aid mismatch!?\n'
|
||||||
|
f'Channel.aid = {chan.aid!r}\n'
|
||||||
|
f'Start.uid = {actor_uuid!r}\n'
|
||||||
|
)
|
||||||
|
# await debug.pause()
|
||||||
|
op_repr: str = 'Start <=) '
|
||||||
|
req_repr: str = _pformat.nest_from_op(
|
||||||
|
input_op=op_repr,
|
||||||
|
op_suffix='',
|
||||||
|
nest_prefix='',
|
||||||
|
text=f'{chan}',
|
||||||
|
|
||||||
|
nest_indent=len(op_repr)-1,
|
||||||
|
rm_from_first_ln='<',
|
||||||
|
# ^XXX, subtract -1 to account for
|
||||||
|
# <Channel
|
||||||
|
# ^_chevron to be stripped
|
||||||
|
)
|
||||||
start_status: str = (
|
start_status: str = (
|
||||||
'Handling RPC `Start` request\n'
|
'Handling RPC request\n'
|
||||||
f'<= peer: {actorid}\n\n'
|
f'{req_repr}\n'
|
||||||
f' |_{chan}\n'
|
f'\n'
|
||||||
f' |_cid: {cid}\n\n'
|
f'->{{ ipc-context-id: {cid!r}\n'
|
||||||
# f' |_{ns}.{funcname}({kwargs})\n'
|
f'->{{ nsp for fn: `{ns}.{funcname}({kwargs})`\n'
|
||||||
f'>> {actor.uid}\n'
|
|
||||||
f' |_{actor}\n'
|
|
||||||
f' -> nsp: `{ns}.{funcname}({kwargs})`\n'
|
|
||||||
|
|
||||||
# f' |_{ns}.{funcname}({kwargs})\n\n'
|
|
||||||
|
|
||||||
# f'{pretty_struct.pformat(msg)}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# runtime-internal endpoint: `Actor.<funcname>`
|
# runtime-internal endpoint: `Actor.<funcname>`
|
||||||
|
@ -1122,10 +1143,6 @@ async def process_messages(
|
||||||
await chan.send(err_msg)
|
await chan.send(err_msg)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
start_status += (
|
|
||||||
f' -> func: {func}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# schedule a task for the requested RPC function
|
# schedule a task for the requested RPC function
|
||||||
# in the actor's main "service nursery".
|
# in the actor's main "service nursery".
|
||||||
#
|
#
|
||||||
|
@ -1133,7 +1150,7 @@ async def process_messages(
|
||||||
# supervision isolation? would avoid having to
|
# supervision isolation? would avoid having to
|
||||||
# manage RPC tasks individually in `._rpc_tasks`
|
# manage RPC tasks individually in `._rpc_tasks`
|
||||||
# table?
|
# table?
|
||||||
start_status += ' -> scheduling new task..\n'
|
start_status += '->( scheduling new task..\n'
|
||||||
log.runtime(start_status)
|
log.runtime(start_status)
|
||||||
try:
|
try:
|
||||||
ctx: Context = await actor._service_n.start(
|
ctx: Context = await actor._service_n.start(
|
||||||
|
@ -1222,7 +1239,7 @@ async def process_messages(
|
||||||
f'|_{chan}\n'
|
f'|_{chan}\n'
|
||||||
)
|
)
|
||||||
await actor.cancel_rpc_tasks(
|
await actor.cancel_rpc_tasks(
|
||||||
req_uid=actor.uid,
|
req_aid=actor.aid,
|
||||||
# a "self cancel" in terms of the lifetime of the
|
# a "self cancel" in terms of the lifetime of the
|
||||||
# IPC connection which is presumed to be the
|
# IPC connection which is presumed to be the
|
||||||
# source of any requests for spawned tasks.
|
# source of any requests for spawned tasks.
|
||||||
|
@ -1294,13 +1311,37 @@ async def process_messages(
|
||||||
finally:
|
finally:
|
||||||
# msg debugging for when he machinery is brokey
|
# msg debugging for when he machinery is brokey
|
||||||
if msg is None:
|
if msg is None:
|
||||||
message: str = 'Exiting IPC msg loop without receiving a msg?'
|
message: str = 'Exiting RPC-loop without receiving a msg?'
|
||||||
else:
|
else:
|
||||||
|
task_op_repr: str = ')>'
|
||||||
|
task: trio.Task = trio.lowlevel.current_task()
|
||||||
|
|
||||||
|
# maybe add cancelled opt prefix
|
||||||
|
if task._cancel_status.effectively_cancelled:
|
||||||
|
task_op_repr = 'c' + task_op_repr
|
||||||
|
|
||||||
|
task_repr: str = _pformat.nest_from_op(
|
||||||
|
input_op=task_op_repr,
|
||||||
|
text=f'{task!r}',
|
||||||
|
nest_indent=1,
|
||||||
|
)
|
||||||
|
# chan_op_repr: str = '<=} '
|
||||||
|
# chan_repr: str = _pformat.nest_from_op(
|
||||||
|
# input_op=chan_op_repr,
|
||||||
|
# op_suffix='',
|
||||||
|
# nest_prefix='',
|
||||||
|
# text=chan.pformat(),
|
||||||
|
# nest_indent=len(chan_op_repr)-1,
|
||||||
|
# rm_from_first_ln='<',
|
||||||
|
# )
|
||||||
message: str = (
|
message: str = (
|
||||||
'Exiting IPC msg loop with final msg\n\n'
|
f'Exiting RPC-loop with final msg\n'
|
||||||
f'<= peer: {chan.uid}\n'
|
f'\n'
|
||||||
f' |_{chan}\n\n'
|
# f'{chan_repr}\n'
|
||||||
# f'{pretty_struct.pformat(msg)}'
|
f'{task_repr}\n'
|
||||||
|
f'\n'
|
||||||
|
f'{pretty_struct.pformat(msg)}'
|
||||||
|
f'\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
log.runtime(message)
|
log.runtime(message)
|
||||||
|
|
|
@ -262,6 +262,7 @@ def nest_from_op(
|
||||||
nest_indent: int|None = None,
|
nest_indent: int|None = None,
|
||||||
# XXX indent `next_prefix` "to-the-right-of" `input_op`
|
# XXX indent `next_prefix` "to-the-right-of" `input_op`
|
||||||
# by this count of whitespaces (' ').
|
# by this count of whitespaces (' ').
|
||||||
|
rm_from_first_ln: str|None = None,
|
||||||
|
|
||||||
) -> str:
|
) -> str:
|
||||||
'''
|
'''
|
||||||
|
@ -346,20 +347,35 @@ def nest_from_op(
|
||||||
if (
|
if (
|
||||||
nest_prefix
|
nest_prefix
|
||||||
and
|
and
|
||||||
nest_indent
|
nest_indent != 0
|
||||||
):
|
):
|
||||||
nest_prefix: str = textwrap.indent(
|
if nest_indent is not None:
|
||||||
nest_prefix,
|
nest_prefix: str = textwrap.indent(
|
||||||
prefix=nest_indent*' ',
|
nest_prefix,
|
||||||
)
|
prefix=nest_indent*' ',
|
||||||
|
)
|
||||||
|
nest_indent: int = len(nest_prefix)
|
||||||
|
|
||||||
|
# determine body-text indent either by,
|
||||||
|
# - using wtv explicit indent value is provided,
|
||||||
|
# OR
|
||||||
|
# - auto-calcing the indent to embed `text` under
|
||||||
|
# the `nest_prefix` if provided, **IFF** `nest_indent=None`.
|
||||||
|
tree_str_indent: int = 0
|
||||||
|
if nest_indent not in {0, None}:
|
||||||
|
tree_str_indent = nest_indent
|
||||||
|
elif (
|
||||||
|
nest_prefix
|
||||||
|
and
|
||||||
|
nest_indent != 0
|
||||||
|
):
|
||||||
|
tree_str_indent = len(nest_prefix)
|
||||||
|
|
||||||
indented_tree_str: str = text
|
indented_tree_str: str = text
|
||||||
tree_str_indent: int = 0
|
if tree_str_indent:
|
||||||
if nest_indent != 0:
|
|
||||||
tree_str_indent: int = len(nest_prefix)
|
|
||||||
indented_tree_str: str = textwrap.indent(
|
indented_tree_str: str = textwrap.indent(
|
||||||
text,
|
text,
|
||||||
prefix=' '*tree_str_indent
|
prefix=' '*tree_str_indent,
|
||||||
)
|
)
|
||||||
|
|
||||||
# inject any provided nesting-prefix chars
|
# inject any provided nesting-prefix chars
|
||||||
|
@ -369,18 +385,35 @@ def nest_from_op(
|
||||||
f'{nest_prefix}{indented_tree_str[tree_str_indent:]}'
|
f'{nest_prefix}{indented_tree_str[tree_str_indent:]}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
not prefix_op
|
||||||
|
or
|
||||||
|
rm_from_first_ln
|
||||||
|
):
|
||||||
|
tree_lns: list[str] = indented_tree_str.splitlines()
|
||||||
|
first: str = tree_lns[0]
|
||||||
|
if rm_from_first_ln:
|
||||||
|
first = first.strip().replace(
|
||||||
|
rm_from_first_ln,
|
||||||
|
'',
|
||||||
|
)
|
||||||
|
indented_tree_str: str = '\n'.join(tree_lns[1:])
|
||||||
|
|
||||||
|
if prefix_op:
|
||||||
|
indented_tree_str = (
|
||||||
|
f'{first}\n'
|
||||||
|
f'{indented_tree_str}'
|
||||||
|
)
|
||||||
|
|
||||||
if prefix_op:
|
if prefix_op:
|
||||||
return (
|
return (
|
||||||
f'{input_op}{op_suffix}'
|
f'{input_op}{op_suffix}'
|
||||||
f'{indented_tree_str}'
|
f'{indented_tree_str}'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
tree_lns: list[str] = indented_tree_str.splitlines()
|
|
||||||
first: str = tree_lns[0]
|
|
||||||
rest: str = '\n'.join(tree_lns[1:])
|
|
||||||
return (
|
return (
|
||||||
f'{first}{input_op}{op_suffix}'
|
f'{first}{input_op}{op_suffix}'
|
||||||
f'{rest}'
|
f'{indented_tree_str}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -171,11 +171,23 @@ class Channel:
|
||||||
)
|
)
|
||||||
assert transport.raddr == addr
|
assert transport.raddr == addr
|
||||||
chan = Channel(transport=transport)
|
chan = Channel(transport=transport)
|
||||||
log.runtime(
|
|
||||||
f'Connected channel IPC transport\n'
|
# ?TODO, compact this into adapter level-methods?
|
||||||
f'[>\n'
|
# -[ ] would avoid extra repr-calcs if level not active?
|
||||||
f' |_{chan}\n'
|
# |_ how would the `calc_if_level` look though? func?
|
||||||
)
|
if log.at_least_level('runtime'):
|
||||||
|
from tractor.devx import (
|
||||||
|
pformat as _pformat,
|
||||||
|
)
|
||||||
|
chan_repr: str = _pformat.nest_from_op(
|
||||||
|
input_op='[>',
|
||||||
|
text=chan.pformat(),
|
||||||
|
nest_indent=1,
|
||||||
|
)
|
||||||
|
log.runtime(
|
||||||
|
f'Connected channel IPC transport\n'
|
||||||
|
f'{chan_repr}'
|
||||||
|
)
|
||||||
return chan
|
return chan
|
||||||
|
|
||||||
@cm
|
@cm
|
||||||
|
@ -218,17 +230,19 @@ class Channel:
|
||||||
if privates else ''
|
if privates else ''
|
||||||
) + ( # peer-actor (processs) section
|
) + ( # peer-actor (processs) section
|
||||||
f' |_peer: {self.aid.reprol()!r}\n'
|
f' |_peer: {self.aid.reprol()!r}\n'
|
||||||
if self.aid else '<unknown>'
|
if self.aid else ' |_peer: <unknown>\n'
|
||||||
) + (
|
) + (
|
||||||
f' |_msgstream: {tpt_name}\n'
|
f' |_msgstream: {tpt_name}\n'
|
||||||
f' proto={tpt.laddr.proto_key!r}\n'
|
f' maddr: {tpt.maddr!r}\n'
|
||||||
f' layer={tpt.layer_key!r}\n'
|
f' proto: {tpt.laddr.proto_key!r}\n'
|
||||||
f' laddr={tpt.laddr}\n'
|
f' layer: {tpt.layer_key!r}\n'
|
||||||
f' raddr={tpt.raddr}\n'
|
f' codec: {tpt.codec_key!r}\n'
|
||||||
f' codec={tpt.codec_key!r}\n'
|
f' .laddr={tpt.laddr}\n'
|
||||||
f' stream={tpt.stream}\n'
|
f' .raddr={tpt.raddr}\n'
|
||||||
f' maddr={tpt.maddr!r}\n'
|
) + (
|
||||||
f' drained={tpt.drained}\n'
|
f' ._transport.stream={tpt.stream}\n'
|
||||||
|
f' ._transport.drained={tpt.drained}\n'
|
||||||
|
if privates else ''
|
||||||
) + (
|
) + (
|
||||||
f' _send_lock={tpt._send_lock.statistics()}\n'
|
f' _send_lock={tpt._send_lock.statistics()}\n'
|
||||||
if privates else ''
|
if privates else ''
|
||||||
|
@ -257,6 +271,10 @@ class Channel:
|
||||||
def raddr(self) -> Address|None:
|
def raddr(self) -> Address|None:
|
||||||
return self._transport.raddr if self._transport else None
|
return self._transport.raddr if self._transport else None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def maddr(self) -> str:
|
||||||
|
return self._transport.maddr if self._transport else '<no-tpt>'
|
||||||
|
|
||||||
# TODO: something like,
|
# TODO: something like,
|
||||||
# `pdbp.hideframe_on(errors=[MsgTypeError])`
|
# `pdbp.hideframe_on(errors=[MsgTypeError])`
|
||||||
# instead of the `try/except` hack we have rn..
|
# instead of the `try/except` hack we have rn..
|
||||||
|
@ -444,8 +462,8 @@ class Channel:
|
||||||
await self.send(aid)
|
await self.send(aid)
|
||||||
peer_aid: Aid = await self.recv()
|
peer_aid: Aid = await self.recv()
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Received hanshake with peer actor,\n'
|
f'Received hanshake with peer\n'
|
||||||
f'{peer_aid}\n'
|
f'<= {peer_aid.reprol(sin_uuid=False)}\n'
|
||||||
)
|
)
|
||||||
# NOTE, we always are referencing the remote peer!
|
# NOTE, we always are referencing the remote peer!
|
||||||
self.aid = peer_aid
|
self.aid = peer_aid
|
||||||
|
|
|
@ -17,9 +17,16 @@
|
||||||
Utils to tame mp non-SC madeness
|
Utils to tame mp non-SC madeness
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
# !TODO! in 3.13 this can be disabled (the-same/similarly) using
|
||||||
|
# a flag,
|
||||||
|
# - [ ] soo if it works like this, drop this module entirely for
|
||||||
|
# 3.13+ B)
|
||||||
|
# |_https://docs.python.org/3/library/multiprocessing.shared_memory.html
|
||||||
|
#
|
||||||
def disable_mantracker():
|
def disable_mantracker():
|
||||||
'''
|
'''
|
||||||
Disable all ``multiprocessing``` "resource tracking" machinery since
|
Disable all `multiprocessing` "resource tracking" machinery since
|
||||||
it's an absolute multi-threaded mess of non-SC madness.
|
it's an absolute multi-threaded mess of non-SC madness.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -26,7 +26,7 @@ from contextlib import (
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
import inspect
|
import inspect
|
||||||
from pprint import pformat
|
import textwrap
|
||||||
from types import (
|
from types import (
|
||||||
ModuleType,
|
ModuleType,
|
||||||
)
|
)
|
||||||
|
@ -43,7 +43,10 @@ from trio import (
|
||||||
SocketListener,
|
SocketListener,
|
||||||
)
|
)
|
||||||
|
|
||||||
# from ..devx import debug
|
from ..devx.pformat import (
|
||||||
|
ppfmt,
|
||||||
|
nest_from_op,
|
||||||
|
)
|
||||||
from .._exceptions import (
|
from .._exceptions import (
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
)
|
)
|
||||||
|
@ -141,9 +144,8 @@ async def maybe_wait_on_canced_subs(
|
||||||
|
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Waiting on cancel request to peer..\n'
|
'Waiting on cancel request to peer\n'
|
||||||
f'c)=>\n'
|
f'c)=> {chan.aid.reprol()}@[{chan.maddr}]\n'
|
||||||
f' |_{chan.aid}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: this is a soft wait on the channel (and its
|
# XXX: this is a soft wait on the channel (and its
|
||||||
|
@ -179,7 +181,7 @@ async def maybe_wait_on_canced_subs(
|
||||||
log.warning(
|
log.warning(
|
||||||
'Draining msg from disconnected peer\n'
|
'Draining msg from disconnected peer\n'
|
||||||
f'{chan_info}'
|
f'{chan_info}'
|
||||||
f'{pformat(msg)}\n'
|
f'{ppfmt(msg)}\n'
|
||||||
)
|
)
|
||||||
# cid: str|None = msg.get('cid')
|
# cid: str|None = msg.get('cid')
|
||||||
cid: str|None = msg.cid
|
cid: str|None = msg.cid
|
||||||
|
@ -248,7 +250,7 @@ async def maybe_wait_on_canced_subs(
|
||||||
if children := local_nursery._children:
|
if children := local_nursery._children:
|
||||||
# indent from above local-nurse repr
|
# indent from above local-nurse repr
|
||||||
report += (
|
report += (
|
||||||
f' |_{pformat(children)}\n'
|
f' |_{ppfmt(children)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
log.warning(report)
|
log.warning(report)
|
||||||
|
@ -279,8 +281,9 @@ async def maybe_wait_on_canced_subs(
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Peer IPC broke but subproc is alive?\n\n'
|
f'Peer IPC broke but subproc is alive?\n\n'
|
||||||
|
|
||||||
f'<=x {chan.aid}@{chan.raddr}\n'
|
f'<=x {chan.aid.reprol()}@[{chan.maddr}]\n'
|
||||||
f' |_{proc}\n'
|
f'\n'
|
||||||
|
f'{proc}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
return local_nursery
|
return local_nursery
|
||||||
|
@ -324,9 +327,10 @@ async def handle_stream_from_peer(
|
||||||
|
|
||||||
chan = Channel.from_stream(stream)
|
chan = Channel.from_stream(stream)
|
||||||
con_status: str = (
|
con_status: str = (
|
||||||
'New inbound IPC connection <=\n'
|
f'New inbound IPC transport connection\n'
|
||||||
f'|_{chan}\n'
|
f'<=( {stream!r}\n'
|
||||||
)
|
)
|
||||||
|
con_status_steps: str = ''
|
||||||
|
|
||||||
# initial handshake with peer phase
|
# initial handshake with peer phase
|
||||||
try:
|
try:
|
||||||
|
@ -372,7 +376,7 @@ async def handle_stream_from_peer(
|
||||||
if _pre_chan := server._peers.get(uid):
|
if _pre_chan := server._peers.get(uid):
|
||||||
familiar: str = 'pre-existing-peer'
|
familiar: str = 'pre-existing-peer'
|
||||||
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
|
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
|
||||||
con_status += (
|
con_status_steps += (
|
||||||
f' -> Handshake with {familiar} `{uid_short}` complete\n'
|
f' -> Handshake with {familiar} `{uid_short}` complete\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -397,7 +401,7 @@ async def handle_stream_from_peer(
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
if event:
|
if event:
|
||||||
con_status += (
|
con_status_steps += (
|
||||||
' -> Waking subactor spawn waiters: '
|
' -> Waking subactor spawn waiters: '
|
||||||
f'{event.statistics().tasks_waiting}\n'
|
f'{event.statistics().tasks_waiting}\n'
|
||||||
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
|
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
|
||||||
|
@ -408,7 +412,7 @@ async def handle_stream_from_peer(
|
||||||
event.set()
|
event.set()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
con_status += (
|
con_status_steps += (
|
||||||
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
|
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
|
||||||
) # type: ignore
|
) # type: ignore
|
||||||
|
|
||||||
|
@ -422,8 +426,15 @@ async def handle_stream_from_peer(
|
||||||
# TODO: can we just use list-ref directly?
|
# TODO: can we just use list-ref directly?
|
||||||
chans.append(chan)
|
chans.append(chan)
|
||||||
|
|
||||||
con_status += ' -> Entering RPC msg loop..\n'
|
con_status_steps += ' -> Entering RPC msg loop..\n'
|
||||||
log.runtime(con_status)
|
log.runtime(
|
||||||
|
con_status
|
||||||
|
+
|
||||||
|
textwrap.indent(
|
||||||
|
con_status_steps,
|
||||||
|
prefix=' '*3, # align to first-ln
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# Begin channel management - respond to remote requests and
|
# Begin channel management - respond to remote requests and
|
||||||
# process received reponses.
|
# process received reponses.
|
||||||
|
@ -456,41 +467,67 @@ async def handle_stream_from_peer(
|
||||||
disconnected=disconnected,
|
disconnected=disconnected,
|
||||||
)
|
)
|
||||||
|
|
||||||
# ``Channel`` teardown and closure sequence
|
# `Channel` teardown and closure sequence
|
||||||
# drop ref to channel so it can be gc-ed and disconnected
|
# drop ref to channel so it can be gc-ed and disconnected
|
||||||
con_teardown_status: str = (
|
#
|
||||||
f'IPC channel disconnected:\n'
|
# -[x]TODO mk this be like
|
||||||
f'<=x uid: {chan.aid}\n'
|
# <=x Channel(
|
||||||
f' |_{pformat(chan)}\n\n'
|
# |_field: blah
|
||||||
|
# )>
|
||||||
|
op_repr: str = '<=x '
|
||||||
|
chan_repr: str = nest_from_op(
|
||||||
|
input_op=op_repr,
|
||||||
|
op_suffix='',
|
||||||
|
nest_prefix='',
|
||||||
|
text=chan.pformat(),
|
||||||
|
nest_indent=len(op_repr)-1,
|
||||||
|
rm_from_first_ln='<',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
con_teardown_status: str = (
|
||||||
|
f'IPC channel disconnect\n'
|
||||||
|
f'\n'
|
||||||
|
f'{chan_repr}\n'
|
||||||
|
f'\n'
|
||||||
|
)
|
||||||
|
|
||||||
chans.remove(chan)
|
chans.remove(chan)
|
||||||
|
|
||||||
# TODO: do we need to be this pedantic?
|
# TODO: do we need to be this pedantic?
|
||||||
if not chans:
|
if not chans:
|
||||||
con_teardown_status += (
|
con_teardown_status += (
|
||||||
f'-> No more channels with {chan.aid}'
|
f'-> No more channels with {chan.aid.reprol()!r}\n'
|
||||||
)
|
)
|
||||||
server._peers.pop(uid, None)
|
server._peers.pop(uid, None)
|
||||||
|
|
||||||
peers_str: str = ''
|
if peers := list(server._peers.values()):
|
||||||
for uid, chans in server._peers.items():
|
peer_cnt: int = len(peers)
|
||||||
peers_str += (
|
if (
|
||||||
f'uid: {uid}\n'
|
(first := peers[0][0]) is not chan
|
||||||
)
|
and
|
||||||
for i, chan in enumerate(chans):
|
not disconnected
|
||||||
peers_str += (
|
and
|
||||||
f' |_[{i}] {pformat(chan)}\n'
|
peer_cnt > 1
|
||||||
|
):
|
||||||
|
con_teardown_status += (
|
||||||
|
f'-> Remaining IPC {peer_cnt-1!r} peers:\n'
|
||||||
)
|
)
|
||||||
|
for chans in server._peers.values():
|
||||||
con_teardown_status += (
|
first: Channel = chans[0]
|
||||||
f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n'
|
if not (
|
||||||
)
|
first is chan
|
||||||
|
and
|
||||||
|
disconnected
|
||||||
|
):
|
||||||
|
con_teardown_status += (
|
||||||
|
f' |_{first.aid.reprol()!r} -> {len(chans)!r} chans\n'
|
||||||
|
)
|
||||||
|
|
||||||
# No more channels to other actors (at all) registered
|
# No more channels to other actors (at all) registered
|
||||||
# as connected.
|
# as connected.
|
||||||
if not server._peers:
|
if not server._peers:
|
||||||
con_teardown_status += (
|
con_teardown_status += (
|
||||||
'Signalling no more peer channel connections'
|
'-> Signalling no more peer connections!\n'
|
||||||
)
|
)
|
||||||
server._no_more_peers.set()
|
server._no_more_peers.set()
|
||||||
|
|
||||||
|
@ -579,10 +616,10 @@ async def handle_stream_from_peer(
|
||||||
|
|
||||||
class Endpoint(Struct):
|
class Endpoint(Struct):
|
||||||
'''
|
'''
|
||||||
An instance of an IPC "bound" address where the lifetime of the
|
An instance of an IPC "bound" address where the lifetime of an
|
||||||
"ability to accept connections" (from clients) and then handle
|
"ability to accept connections" and handle the subsequent
|
||||||
those inbound sessions or sequences-of-packets is determined by
|
sequence-of-packets (maybe oriented as sessions) is determined by
|
||||||
a (maybe pair of) nurser(y/ies).
|
the underlying nursery scope(s).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
addr: Address
|
addr: Address
|
||||||
|
@ -600,6 +637,24 @@ class Endpoint(Struct):
|
||||||
MsgTransport, # handle to encoded-msg transport stream
|
MsgTransport, # handle to encoded-msg transport stream
|
||||||
] = {}
|
] = {}
|
||||||
|
|
||||||
|
def pformat(
|
||||||
|
self,
|
||||||
|
indent: int = 0,
|
||||||
|
privates: bool = False,
|
||||||
|
) -> str:
|
||||||
|
type_repr: str = type(self).__name__
|
||||||
|
fmtstr: str = (
|
||||||
|
# !TODO, always be ns aware!
|
||||||
|
# f'|_netns: {netns}\n'
|
||||||
|
f' |.addr: {self.addr!r}\n'
|
||||||
|
f' |_peers: {len(self.peer_tpts)}\n'
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
f'<{type_repr}(\n'
|
||||||
|
f'{fmtstr}'
|
||||||
|
f')>'
|
||||||
|
)
|
||||||
|
|
||||||
async def start_listener(self) -> SocketListener:
|
async def start_listener(self) -> SocketListener:
|
||||||
tpt_mod: ModuleType = inspect.getmodule(self.addr)
|
tpt_mod: ModuleType = inspect.getmodule(self.addr)
|
||||||
lstnr: SocketListener = await tpt_mod.start_listener(
|
lstnr: SocketListener = await tpt_mod.start_listener(
|
||||||
|
@ -639,11 +694,13 @@ class Endpoint(Struct):
|
||||||
class Server(Struct):
|
class Server(Struct):
|
||||||
_parent_tn: Nursery
|
_parent_tn: Nursery
|
||||||
_stream_handler_tn: Nursery
|
_stream_handler_tn: Nursery
|
||||||
|
|
||||||
# level-triggered sig for whether "no peers are currently
|
# level-triggered sig for whether "no peers are currently
|
||||||
# connected"; field is **always** set to an instance but
|
# connected"; field is **always** set to an instance but
|
||||||
# initialized with `.is_set() == True`.
|
# initialized with `.is_set() == True`.
|
||||||
_no_more_peers: trio.Event
|
_no_more_peers: trio.Event
|
||||||
|
|
||||||
|
# active eps as allocated by `.listen_on()`
|
||||||
_endpoints: list[Endpoint] = []
|
_endpoints: list[Endpoint] = []
|
||||||
|
|
||||||
# connection tracking & mgmt
|
# connection tracking & mgmt
|
||||||
|
@ -651,12 +708,19 @@ class Server(Struct):
|
||||||
str, # uaid
|
str, # uaid
|
||||||
list[Channel], # IPC conns from peer
|
list[Channel], # IPC conns from peer
|
||||||
] = defaultdict(list)
|
] = defaultdict(list)
|
||||||
|
|
||||||
|
# events-table with entries registered unset while the local
|
||||||
|
# actor is waiting on a new actor to inbound connect, often
|
||||||
|
# a parent waiting on its child just after spawn.
|
||||||
_peer_connected: dict[
|
_peer_connected: dict[
|
||||||
tuple[str, str],
|
tuple[str, str],
|
||||||
trio.Event,
|
trio.Event,
|
||||||
] = {}
|
] = {}
|
||||||
|
|
||||||
# syncs for setup/teardown sequences
|
# syncs for setup/teardown sequences
|
||||||
|
# - null when not yet booted,
|
||||||
|
# - unset when active,
|
||||||
|
# - set when fully shutdown with 0 eps active.
|
||||||
_shutdown: trio.Event|None = None
|
_shutdown: trio.Event|None = None
|
||||||
|
|
||||||
# TODO, maybe just make `._endpoints: list[Endpoint]` and
|
# TODO, maybe just make `._endpoints: list[Endpoint]` and
|
||||||
|
@ -664,7 +728,6 @@ class Server(Struct):
|
||||||
# @property
|
# @property
|
||||||
# def addrs2eps(self) -> dict[Address, Endpoint]:
|
# def addrs2eps(self) -> dict[Address, Endpoint]:
|
||||||
# ...
|
# ...
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def proto_keys(self) -> list[str]:
|
def proto_keys(self) -> list[str]:
|
||||||
return [
|
return [
|
||||||
|
@ -690,7 +753,7 @@ class Server(Struct):
|
||||||
# TODO: obvi a different server type when we eventually
|
# TODO: obvi a different server type when we eventually
|
||||||
# support some others XD
|
# support some others XD
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Cancelling server(s) for\n'
|
f'Cancelling server(s) for tpt-protos\n'
|
||||||
f'{self.proto_keys!r}\n'
|
f'{self.proto_keys!r}\n'
|
||||||
)
|
)
|
||||||
self._parent_tn.cancel_scope.cancel()
|
self._parent_tn.cancel_scope.cancel()
|
||||||
|
@ -717,6 +780,14 @@ class Server(Struct):
|
||||||
f'protos: {tpt_protos!r}\n'
|
f'protos: {tpt_protos!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def len_peers(
|
||||||
|
self,
|
||||||
|
) -> int:
|
||||||
|
return len([
|
||||||
|
chan.connected()
|
||||||
|
for chan in chain(*self._peers.values())
|
||||||
|
])
|
||||||
|
|
||||||
def has_peers(
|
def has_peers(
|
||||||
self,
|
self,
|
||||||
check_chans: bool = False,
|
check_chans: bool = False,
|
||||||
|
@ -730,13 +801,11 @@ class Server(Struct):
|
||||||
has_peers
|
has_peers
|
||||||
and
|
and
|
||||||
check_chans
|
check_chans
|
||||||
|
and
|
||||||
|
(peer_cnt := self.len_peers())
|
||||||
):
|
):
|
||||||
has_peers: bool = (
|
has_peers: bool = (
|
||||||
any(chan.connected()
|
peer_cnt > 0
|
||||||
for chan in chain(
|
|
||||||
*self._peers.values()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
and
|
and
|
||||||
has_peers
|
has_peers
|
||||||
)
|
)
|
||||||
|
@ -803,30 +872,66 @@ class Server(Struct):
|
||||||
|
|
||||||
return ev.is_set()
|
return ev.is_set()
|
||||||
|
|
||||||
def pformat(self) -> str:
|
@property
|
||||||
|
def repr_state(self) -> str:
|
||||||
|
'''
|
||||||
|
A `str`-status describing the current state of this
|
||||||
|
IPC server in terms of the current operating "phase".
|
||||||
|
|
||||||
|
'''
|
||||||
|
status = 'server is active'
|
||||||
|
if self.has_peers():
|
||||||
|
peer_cnt: int = self.len_peers()
|
||||||
|
status: str = (
|
||||||
|
f'{peer_cnt!r} peer chans'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
status: str = 'No peer chans'
|
||||||
|
|
||||||
|
if self.is_shutdown():
|
||||||
|
status: str = 'server-shutdown'
|
||||||
|
|
||||||
|
return status
|
||||||
|
|
||||||
|
def pformat(
|
||||||
|
self,
|
||||||
|
privates: bool = False,
|
||||||
|
) -> str:
|
||||||
eps: list[Endpoint] = self._endpoints
|
eps: list[Endpoint] = self._endpoints
|
||||||
|
|
||||||
state_repr: str = (
|
# state_repr: str = (
|
||||||
f'{len(eps)!r} IPC-endpoints active'
|
# f'{len(eps)!r} endpoints active'
|
||||||
)
|
# )
|
||||||
fmtstr = (
|
fmtstr = (
|
||||||
f' |_state: {state_repr}\n'
|
f' |_state: {self.repr_state!r}\n'
|
||||||
f' no_more_peers: {self.has_peers()}\n'
|
|
||||||
)
|
)
|
||||||
if self._shutdown is not None:
|
if privates:
|
||||||
shutdown_stats: EventStatistics = self._shutdown.statistics()
|
fmtstr += f' no_more_peers: {self.has_peers()}\n'
|
||||||
|
|
||||||
|
if self._shutdown is not None:
|
||||||
|
shutdown_stats: EventStatistics = self._shutdown.statistics()
|
||||||
|
fmtstr += (
|
||||||
|
f' task_waiting_on_shutdown: {shutdown_stats}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
if eps := self._endpoints:
|
||||||
|
addrs: list[tuple] = [
|
||||||
|
ep.addr for ep in eps
|
||||||
|
]
|
||||||
|
repr_eps: str = ppfmt(addrs)
|
||||||
|
|
||||||
fmtstr += (
|
fmtstr += (
|
||||||
f' task_waiting_on_shutdown: {shutdown_stats}\n'
|
f' |_endpoints: {repr_eps}\n'
|
||||||
|
# ^TODO? how to indent closing ']'..
|
||||||
)
|
)
|
||||||
|
|
||||||
fmtstr += (
|
if peers := self._peers:
|
||||||
# TODO, use the `ppfmt()` helper from `modden`!
|
fmtstr += (
|
||||||
f' |_endpoints: {pformat(self._endpoints)}\n'
|
f' |_peers: {len(peers)} connected\n'
|
||||||
f' |_peers: {len(self._peers)} connected\n'
|
)
|
||||||
)
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
f'<IPCServer(\n'
|
f'<Server(\n'
|
||||||
f'{fmtstr}'
|
f'{fmtstr}'
|
||||||
f')>\n'
|
f')>\n'
|
||||||
)
|
)
|
||||||
|
@ -885,8 +990,8 @@ class Server(Struct):
|
||||||
)
|
)
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Binding to endpoints for,\n'
|
f'Binding endpoints\n'
|
||||||
f'{accept_addrs}\n'
|
f'{ppfmt(accept_addrs)}\n'
|
||||||
)
|
)
|
||||||
eps: list[Endpoint] = await self._parent_tn.start(
|
eps: list[Endpoint] = await self._parent_tn.start(
|
||||||
partial(
|
partial(
|
||||||
|
@ -896,13 +1001,19 @@ class Server(Struct):
|
||||||
listen_addrs=accept_addrs,
|
listen_addrs=accept_addrs,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
self._endpoints.extend(eps)
|
||||||
|
|
||||||
|
serv_repr: str = nest_from_op(
|
||||||
|
input_op='(>',
|
||||||
|
text=self.pformat(),
|
||||||
|
nest_indent=1,
|
||||||
|
)
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Started IPC endpoints\n'
|
f'Started IPC server\n'
|
||||||
f'{eps}\n'
|
f'{serv_repr}'
|
||||||
)
|
)
|
||||||
|
|
||||||
self._endpoints.extend(eps)
|
# XXX, a little sanity on new ep allocations
|
||||||
# XXX, just a little bit of sanity
|
|
||||||
group_tn: Nursery|None = None
|
group_tn: Nursery|None = None
|
||||||
ep: Endpoint
|
ep: Endpoint
|
||||||
for ep in eps:
|
for ep in eps:
|
||||||
|
@ -956,9 +1067,13 @@ async def _serve_ipc_eps(
|
||||||
stream_handler_tn=stream_handler_tn,
|
stream_handler_tn=stream_handler_tn,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
|
ep_sclang: str = nest_from_op(
|
||||||
|
input_op='>[',
|
||||||
|
text=f'{ep.pformat()}',
|
||||||
|
)
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Starting new endpoint listener\n'
|
f'Starting new endpoint listener\n'
|
||||||
f'{ep}\n'
|
f'{ep_sclang}\n'
|
||||||
)
|
)
|
||||||
listener: trio.abc.Listener = await ep.start_listener()
|
listener: trio.abc.Listener = await ep.start_listener()
|
||||||
assert listener is ep._listener
|
assert listener is ep._listener
|
||||||
|
@ -996,17 +1111,6 @@ async def _serve_ipc_eps(
|
||||||
handler_nursery=stream_handler_tn
|
handler_nursery=stream_handler_tn
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# TODO, wow make this message better! XD
|
|
||||||
log.runtime(
|
|
||||||
'Started server(s)\n'
|
|
||||||
+
|
|
||||||
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
|
||||||
)
|
|
||||||
|
|
||||||
log.runtime(
|
|
||||||
f'Started IPC endpoints\n'
|
|
||||||
f'{eps}\n'
|
|
||||||
)
|
|
||||||
task_status.started(
|
task_status.started(
|
||||||
eps,
|
eps,
|
||||||
)
|
)
|
||||||
|
@ -1049,8 +1153,7 @@ async def open_ipc_server(
|
||||||
try:
|
try:
|
||||||
yield ipc_server
|
yield ipc_server
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Waiting on server to shutdown or be cancelled..\n'
|
'Server-tn running until terminated\n'
|
||||||
f'{ipc_server}'
|
|
||||||
)
|
)
|
||||||
# TODO? when if ever would we want/need this?
|
# TODO? when if ever would we want/need this?
|
||||||
# with trio.CancelScope(shield=True):
|
# with trio.CancelScope(shield=True):
|
||||||
|
|
|
@ -127,10 +127,9 @@ async def start_listener(
|
||||||
Start a TCP socket listener on the given `TCPAddress`.
|
Start a TCP socket listener on the given `TCPAddress`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Attempting to bind TCP socket\n'
|
f'Trying socket bind\n'
|
||||||
f'>[\n'
|
f'>[ {addr}\n'
|
||||||
f'|_{addr}\n'
|
|
||||||
)
|
)
|
||||||
# ?TODO, maybe we should just change the lower-level call this is
|
# ?TODO, maybe we should just change the lower-level call this is
|
||||||
# using internall per-listener?
|
# using internall per-listener?
|
||||||
|
@ -145,11 +144,10 @@ async def start_listener(
|
||||||
assert len(listeners) == 1
|
assert len(listeners) == 1
|
||||||
listener = listeners[0]
|
listener = listeners[0]
|
||||||
host, port = listener.socket.getsockname()[:2]
|
host, port = listener.socket.getsockname()[:2]
|
||||||
|
bound_addr: TCPAddress = type(addr).from_addr((host, port))
|
||||||
log.info(
|
log.info(
|
||||||
f'Listening on TCP socket\n'
|
f'Listening on TCP socket\n'
|
||||||
f'[>\n'
|
f'[> {bound_addr}\n'
|
||||||
f' |_{addr}\n'
|
|
||||||
)
|
)
|
||||||
return listener
|
return listener
|
||||||
|
|
||||||
|
|
|
@ -81,10 +81,35 @@ BOLD_PALETTE = {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def at_least_level(
|
||||||
|
log: Logger|LoggerAdapter,
|
||||||
|
level: int|str,
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Predicate to test if a given level is active.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if isinstance(level, str):
|
||||||
|
level: int = CUSTOM_LEVELS[level.upper()]
|
||||||
|
|
||||||
|
if log.getEffectiveLevel() <= level:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
# TODO: this isn't showing the correct '{filename}'
|
# TODO: this isn't showing the correct '{filename}'
|
||||||
# as it did before..
|
# as it did before..
|
||||||
class StackLevelAdapter(LoggerAdapter):
|
class StackLevelAdapter(LoggerAdapter):
|
||||||
|
|
||||||
|
def at_least_level(
|
||||||
|
self,
|
||||||
|
level: str,
|
||||||
|
) -> bool:
|
||||||
|
return at_least_level(
|
||||||
|
log=self,
|
||||||
|
level=level,
|
||||||
|
)
|
||||||
|
|
||||||
def transport(
|
def transport(
|
||||||
self,
|
self,
|
||||||
msg: str,
|
msg: str,
|
||||||
|
@ -401,19 +426,3 @@ def get_loglevel() -> str:
|
||||||
|
|
||||||
# global module logger for tractor itself
|
# global module logger for tractor itself
|
||||||
log: StackLevelAdapter = get_logger('tractor')
|
log: StackLevelAdapter = get_logger('tractor')
|
||||||
|
|
||||||
|
|
||||||
def at_least_level(
|
|
||||||
log: Logger|LoggerAdapter,
|
|
||||||
level: int|str,
|
|
||||||
) -> bool:
|
|
||||||
'''
|
|
||||||
Predicate to test if a given level is active.
|
|
||||||
|
|
||||||
'''
|
|
||||||
if isinstance(level, str):
|
|
||||||
level: int = CUSTOM_LEVELS[level.upper()]
|
|
||||||
|
|
||||||
if log.getEffectiveLevel() <= level:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
|
@ -177,6 +177,16 @@ class Aid(
|
||||||
f'{self.name}@{self.pid!r}'
|
f'{self.name}@{self.pid!r}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# mk hashable via `.uuid`
|
||||||
|
def __hash__(self) -> int:
|
||||||
|
return hash(self.uuid)
|
||||||
|
|
||||||
|
def __eq__(self, other: Aid) -> bool:
|
||||||
|
return self.uuid == other.uuid
|
||||||
|
|
||||||
|
# use pretty fmt since often repr-ed for console/log
|
||||||
|
__repr__ = pretty_struct.Struct.__repr__
|
||||||
|
|
||||||
|
|
||||||
class SpawnSpec(
|
class SpawnSpec(
|
||||||
pretty_struct.Struct,
|
pretty_struct.Struct,
|
||||||
|
|
|
@ -60,6 +60,9 @@ def find_masked_excs(
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# XXX, relevant ish discussion @ `trio`-core,
|
||||||
|
# https://github.com/python-trio/trio/issues/455#issuecomment-2785122216
|
||||||
|
#
|
||||||
@acm
|
@acm
|
||||||
async def maybe_raise_from_masking_exc(
|
async def maybe_raise_from_masking_exc(
|
||||||
tn: trio.Nursery|None = None,
|
tn: trio.Nursery|None = None,
|
||||||
|
|
Loading…
Reference in New Issue