Compare commits

...

9 Commits

Author SHA1 Message Date
Tyler Goodlet 4a9e8731f1 More prep-to-reduce the `Actor` method-iface
- drop the (never/un)used `.get_chans()`.
- add #TODO for factoring many methods into a new `.rpc`-subsys/pkg
  primitive, like an `RPCMngr/Server` type eventually.
- add todo to maybe mv `.get_parent()` elsewhere?
- move masked `._hard_mofo_kill()` to bottom.
2025-07-08 12:44:46 -04:00
Tyler Goodlet b6ffaea62f Mk `.ipc._tcp.TCPAddress` validate with `ipaddress`
Both via a post-init method to validate the original input `._host: str`
and in `.is_valid` to ensure the host-part isn't something, esoteric..
2025-07-08 12:42:34 -04:00
Tyler Goodlet 63bf967598 Add an `enable_transports` test-suite
Like it sounds, verifying that when that param is passed to the runtime
startup eps (`.open_root_actor()/.open_nursery()`), the appropriate
tpt-protocol is deployed for IPC (both the server and bound endpoints)
in both the root and any sub-actors (as passed down from rent to child
via the `.msg.types.SpawnSpec`).
2025-07-08 12:40:55 -04:00
Tyler Goodlet 011d033a12 Add `.ipc._shm` todo-idea for `@actor_fixture` API 2025-07-08 11:07:50 -04:00
Tyler Goodlet 76fb80fda6 Update buncha log msg fmting in `.msg._ops`
Mostly just multi-line code styling again: always putting standalone
`'f\n'` on separate LOC so it reads like it renders to console. Oh and
and a level drop to `.runtime()` for rx-msg reports.
2025-07-08 11:01:44 -04:00
Tyler Goodlet d50f7ba9ca Couple more `._root` logging tweaks.. 2025-07-07 23:13:14 -04:00
Tyler Goodlet 65ae2dc67c Update buncha log msg fmting in `._spawn`
Again using `Channel.aid.reprol()`, `.devx.pformat.nest_from_op()` and
 converting to multi-line code style an ' for str-report-contents. Tweak
 some imports to sub-mod level as well.
2025-07-07 19:11:01 -04:00
Tyler Goodlet 4be499fb1f Update buncha log msg fmting in `._portal`
Namely to use `Channel.aid.reprol()` and converting to our newer style
multi-line code style for str-reports.
2025-07-07 15:46:53 -04:00
Tyler Goodlet 7317bb269c Use `._supervise._shutdown_msg` in tooling test 2025-07-07 14:31:34 -04:00
9 changed files with 289 additions and 99 deletions

View File

@ -116,9 +116,11 @@ def test_shield_pause(
child.pid, child.pid,
signal.SIGINT, signal.SIGINT,
) )
from tractor._supervise import _shutdown_msg
expect( expect(
child, child,
'Shutting down actor runtime', # 'Shutting down actor runtime',
_shutdown_msg,
timeout=6, timeout=6,
) )
assert_before( assert_before(

View File

@ -0,0 +1,95 @@
'''
Verify the `enable_transports` param drives various
per-root/sub-actor IPC endpoint/server settings.
'''
from __future__ import annotations
import pytest
import trio
import tractor
from tractor import (
Actor,
Portal,
ipc,
msg,
_state,
_addr,
)
@tractor.context
async def chk_tpts(
ctx: tractor.Context,
tpt_proto_key: str,
):
rtvars = _state._runtime_vars
assert (
tpt_proto_key
in
rtvars['_enable_tpts']
)
actor: Actor = tractor.current_actor()
spec: msg.types.SpawnSpec = actor._spawn_spec
assert spec._runtime_vars == rtvars
# ensure individual IPC ep-addr types
serv: ipc._server.Server = actor.ipc_server
addr: ipc._types.Address
for addr in serv.addrs:
assert addr.proto_key == tpt_proto_key
# Actor delegate-props enforcement
assert (
actor.accept_addrs
==
serv.accept_addrs
)
await ctx.started(serv.accept_addrs)
# TODO, parametrize over mis-matched-proto-typed `registry_addrs`
# since i seems to work in `piker` but not exactly sure if both tcp
# & uds are being deployed then?
#
@pytest.mark.parametrize(
'tpt_proto_key',
['tcp', 'uds'],
ids=lambda item: f'ipc_tpt={item!r}'
)
def test_root_passes_tpt_to_sub(
tpt_proto_key: str,
reg_addr: tuple,
debug_mode: bool,
):
async def main():
async with tractor.open_nursery(
enable_transports=[tpt_proto_key],
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as an:
assert (
tpt_proto_key
in
_state._runtime_vars['_enable_tpts']
)
ptl: Portal = await an.start_actor(
name='sub',
enable_modules=[__name__],
)
async with ptl.open_context(
chk_tpts,
tpt_proto_key=tpt_proto_key,
) as (ctx, accept_addrs):
uw_addr: tuple
for uw_addr in accept_addrs:
addr = _addr.wrap_address(uw_addr)
assert addr.is_valid
# shudown sub-actor(s)
await an.cancel()
trio.run(main)

View File

@ -118,6 +118,10 @@ class Portal:
@property @property
def chan(self) -> Channel: def chan(self) -> Channel:
'''
Ref to this ctx's underlying `tractor.ipc.Channel`.
'''
return self._chan return self._chan
@property @property
@ -177,10 +181,17 @@ class Portal:
# not expecting a "main" result # not expecting a "main" result
if self._expect_result_ctx is None: if self._expect_result_ctx is None:
peer_id: str = f'{self.channel.aid.reprol()!r}'
log.warning( log.warning(
f"Portal for {self.channel.aid} not expecting a final" f'Portal to peer {peer_id} will not deliver a final result?\n'
" result?\nresult() should only be called if subactor" f'\n'
" was spawned with `ActorNursery.run_in_actor()`") f'Context.result() can only be called by the parent of '
f'a sub-actor when it was spawned with '
f'`ActorNursery.run_in_actor()`'
f'\n'
f'Further this `ActorNursery`-method-API will deprecated in the'
f'near fututre!\n'
)
return NoResult return NoResult
# expecting a "main" result # expecting a "main" result
@ -213,6 +224,7 @@ class Portal:
typname: str = type(self).__name__ typname: str = type(self).__name__
log.warning( log.warning(
f'`{typname}.result()` is DEPRECATED!\n' f'`{typname}.result()` is DEPRECATED!\n'
f'\n'
f'Use `{typname}.wait_for_result()` instead!\n' f'Use `{typname}.wait_for_result()` instead!\n'
) )
return await self.wait_for_result( return await self.wait_for_result(
@ -224,8 +236,10 @@ class Portal:
# terminate all locally running async generator # terminate all locally running async generator
# IPC calls # IPC calls
if self._streams: if self._streams:
log.cancel( peer_id: str = f'{self.channel.aid.reprol()!r}'
f"Cancelling all streams with {self.channel.aid}") report: str = (
f'Cancelling all msg-streams with {peer_id}\n'
)
for stream in self._streams.copy(): for stream in self._streams.copy():
try: try:
await stream.aclose() await stream.aclose()
@ -234,10 +248,18 @@ class Portal:
# (unless of course at some point down the road we # (unless of course at some point down the road we
# won't expect this to always be the case or need to # won't expect this to always be the case or need to
# detect it for respawning purposes?) # detect it for respawning purposes?)
log.debug(f"{stream} was already closed.") report += (
f'->) {stream!r} already closed\n'
)
log.cancel(report)
async def aclose(self): async def aclose(self):
log.debug(f"Closing {self}") log.debug(
f'Closing portal\n'
f'>}}\n'
f'|_{self}\n'
)
# TODO: once we move to implementing our own `ReceiveChannel` # TODO: once we move to implementing our own `ReceiveChannel`
# (including remote task cancellation inside its `.aclose()`) # (including remote task cancellation inside its `.aclose()`)
# we'll need to .aclose all those channels here # we'll need to .aclose all those channels here
@ -263,19 +285,18 @@ class Portal:
__runtimeframe__: int = 1 # noqa __runtimeframe__: int = 1 # noqa
chan: Channel = self.channel chan: Channel = self.channel
peer_id: str = f'{self.channel.aid.reprol()!r}'
if not chan.connected(): if not chan.connected():
log.runtime( log.runtime(
'This channel is already closed, skipping cancel request..' 'Peer {peer_id} is already disconnected\n'
'-> skipping cancel request..\n'
) )
return False return False
reminfo: str = (
f'c)=> {self.channel.aid}\n'
f' |_{chan}\n'
)
log.cancel( log.cancel(
f'Requesting actor-runtime cancel for peer\n\n' f'Sending actor-runtime-cancel-req to peer\n'
f'{reminfo}' f'\n'
f'c)=> {peer_id}\n'
) )
# XXX the one spot we set it? # XXX the one spot we set it?
@ -300,8 +321,9 @@ class Portal:
# may timeout and we never get an ack (obvi racy) # may timeout and we never get an ack (obvi racy)
# but that doesn't mean it wasn't cancelled. # but that doesn't mean it wasn't cancelled.
log.debug( log.debug(
'May have failed to cancel peer?\n' f'May have failed to cancel peer?\n'
f'{reminfo}' f'\n'
f'c)=?> {peer_id}\n'
) )
# if we get here some weird cancellation case happened # if we get here some weird cancellation case happened
@ -319,22 +341,22 @@ class Portal:
TransportClosed, TransportClosed,
) as tpt_err: ) as tpt_err:
report: str = ( ipc_borked_report: str = (
f'IPC chan for actor already closed or broken?\n\n' f'IPC for actor already closed/broken?\n\n'
f'{self.channel.aid}\n' f'\n'
f' |_{self.channel}\n' f'c)=x> {peer_id}\n'
) )
match tpt_err: match tpt_err:
case TransportClosed(): case TransportClosed():
log.debug(report) log.debug(ipc_borked_report)
case _: case _:
report += ( ipc_borked_report += (
f'\n' f'\n'
f'Unhandled low-level transport-closed/error during\n' f'Unhandled low-level transport-closed/error during\n'
f'Portal.cancel_actor()` request?\n' f'Portal.cancel_actor()` request?\n'
f'<{type(tpt_err).__name__}( {tpt_err} )>\n' f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
) )
log.warning(report) log.warning(ipc_borked_report)
return False return False
@ -491,10 +513,13 @@ class Portal:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await ctx.cancel() await ctx.cancel()
except trio.ClosedResourceError: except trio.ClosedResourceError as cre:
# if the far end terminates before we send a cancel the # if the far end terminates before we send a cancel the
# underlying transport-channel may already be closed. # underlying transport-channel may already be closed.
log.cancel(f'Context {ctx} was already closed?') log.cancel(
f'Context.cancel() -> {cre!r}\n'
f'cid: {ctx.cid!r} already closed?\n'
)
# XXX: should this always be done? # XXX: should this always be done?
# await recv_chan.aclose() # await recv_chan.aclose()

View File

@ -97,7 +97,7 @@ async def maybe_block_bp(
): ):
logger.info( logger.info(
f'Found `greenback` installed @ {maybe_mod}\n' f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n' f'Enabling `tractor.pause_from_sync()` support!\n'
) )
os.environ['PYTHONBREAKPOINT'] = ( os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx.debug._sync_pause_from_builtin' 'tractor.devx.debug._sync_pause_from_builtin'
@ -471,7 +471,7 @@ async def open_root_actor(
'-> Opening new registry @ ' '-> Opening new registry @ '
+ +
'\n'.join( '\n'.join(
f'@{addr}' for addr in reg_addrs f'{addr}' for addr in reg_addrs
) )
) )
logger.info(f'{report}\n') logger.info(f'{report}\n')
@ -543,7 +543,7 @@ async def open_root_actor(
raise raise
finally: finally:
# NOTE: not sure if we'll ever need this but it's # NOTE/TODO?, not sure if we'll ever need this but it's
# possibly better for even more determinism? # possibly better for even more determinism?
# logger.cancel( # logger.cancel(
# f'Waiting on {len(nurseries)} nurseries in root..') # f'Waiting on {len(nurseries)} nurseries in root..')

View File

@ -552,6 +552,14 @@ class Actor:
) )
raise raise
# ?TODO, factor this meth-iface into a new `.rpc` subsys primitive?
# - _get_rpc_func(),
# - _deliver_ctx_payload(),
# - get_context(),
# - start_remote_task(),
# - cancel_rpc_tasks(),
# - _cancel_task(),
#
def _get_rpc_func(self, ns, funcname): def _get_rpc_func(self, ns, funcname):
''' '''
Try to lookup and return a target RPC func from the Try to lookup and return a target RPC func from the
@ -1119,14 +1127,6 @@ class Actor:
self._cancel_complete.set() self._cancel_complete.set()
return True return True
# XXX: hard kill logic if needed?
# def _hard_mofo_kill(self):
# # If we're the root actor or zombied kill everything
# if self._parent_chan is None: # TODO: more robust check
# root = trio.lowlevel.current_root_task()
# for n in root.child_nurseries:
# n.cancel_scope.cancel()
async def _cancel_task( async def _cancel_task(
self, self,
cid: str, cid: str,
@ -1361,25 +1361,13 @@ class Actor:
''' '''
return self.accept_addrs[0] return self.accept_addrs[0]
def get_parent(self) -> Portal: # TODO, this should delegate ONLY to the
''' # `._spawn_spec._runtime_vars: dict` / `._state` APIs?
Return a `Portal` to our parent. #
# XXX, AH RIGHT that's why..
''' # it's bc we pass this as a CLI flag to the child.py precisely
assert self._parent_chan, "No parent channel for this actor?" # bc we need the bootstrapping pre `async_main()`.. but maybe
return Portal(self._parent_chan) # keep this as an impl deat and not part of the pub iface impl?
def get_chans(
self,
uid: tuple[str, str],
) -> list[Channel]:
'''
Return all IPC channels to the actor with provided `uid`.
'''
return self._ipc_server._peers[uid]
def is_infected_aio(self) -> bool: def is_infected_aio(self) -> bool:
''' '''
If `True`, this actor is running `trio` in guest mode on If `True`, this actor is running `trio` in guest mode on
@ -1390,6 +1378,23 @@ class Actor:
''' '''
return self._infected_aio return self._infected_aio
# ?TODO, is this the right type for this method?
def get_parent(self) -> Portal:
'''
Return a `Portal` to our parent.
'''
assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan)
# XXX: hard kill logic if needed?
# def _hard_mofo_kill(self):
# # If we're the root actor or zombied kill everything
# if self._parent_chan is None: # TODO: more robust check
# root = trio.lowlevel.current_root_task()
# for n in root.child_nurseries:
# n.cancel_scope.cancel()
async def async_main( async def async_main(
actor: Actor, actor: Actor,

View File

@ -34,9 +34,9 @@ from typing import (
import trio import trio
from trio import TaskStatus from trio import TaskStatus
from .devx.debug import ( from .devx import (
maybe_wait_for_debugger, debug,
acquire_debug_lock, pformat as _pformat
) )
from tractor._state import ( from tractor._state import (
current_actor, current_actor,
@ -51,14 +51,17 @@ from tractor._portal import Portal
from tractor._runtime import Actor from tractor._runtime import Actor
from tractor._entry import _mp_main from tractor._entry import _mp_main
from tractor._exceptions import ActorFailure from tractor._exceptions import ActorFailure
from tractor.msg.types import ( from tractor.msg import (
Aid, types as msgtypes,
SpawnSpec, pretty_struct,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from ipc import IPCServer from ipc import (
_server,
Channel,
)
from ._supervise import ActorNursery from ._supervise import ActorNursery
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
@ -328,12 +331,13 @@ async def soft_kill(
see `.hard_kill()`). see `.hard_kill()`).
''' '''
peer_aid: Aid = portal.channel.aid chan: Channel = portal.channel
peer_aid: msgtypes.Aid = chan.aid
try: try:
log.cancel( log.cancel(
f'Soft killing sub-actor via portal request\n' f'Soft killing sub-actor via portal request\n'
f'\n' f'\n'
f'(c=> {peer_aid}\n' f'c)=> {peer_aid.reprol()}@[{chan.maddr}]\n'
f' |_{proc}\n' f' |_{proc}\n'
) )
# wait on sub-proc to signal termination # wait on sub-proc to signal termination
@ -341,7 +345,7 @@ async def soft_kill(
except trio.Cancelled: except trio.Cancelled:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await maybe_wait_for_debugger( await debug.maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get( child_in_debug=_runtime_vars.get(
'_debug_mode', False '_debug_mode', False
), ),
@ -465,7 +469,7 @@ async def trio_proc(
"--uid", "--uid",
# TODO, how to pass this over "wire" encodings like # TODO, how to pass this over "wire" encodings like
# cmdline args? # cmdline args?
# -[ ] maybe we can add an `Aid.min_tuple()` ? # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
str(subactor.uid), str(subactor.uid),
# Address the child must connect to on startup # Address the child must connect to on startup
"--parent_addr", "--parent_addr",
@ -483,12 +487,13 @@ async def trio_proc(
cancelled_during_spawn: bool = False cancelled_during_spawn: bool = False
proc: trio.Process|None = None proc: trio.Process|None = None
ipc_server: IPCServer = actor_nursery._actor.ipc_server ipc_server: _server.Server = actor_nursery._actor.ipc_server
try: try:
try: try:
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs) proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
log.runtime( log.runtime(
'Started new child\n' f'Started new child subproc\n'
f'(>\n'
f' |_{proc}\n' f' |_{proc}\n'
) )
@ -507,10 +512,10 @@ async def trio_proc(
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
if is_root_process(): if is_root_process():
await maybe_wait_for_debugger() await debug.maybe_wait_for_debugger()
elif proc is not None: elif proc is not None:
async with acquire_debug_lock(subactor.uid): async with debug.acquire_debug_lock(subactor.uid):
# soft wait on the proc to terminate # soft wait on the proc to terminate
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
await proc.wait() await proc.wait()
@ -528,14 +533,19 @@ async def trio_proc(
# send a "spawning specification" which configures the # send a "spawning specification" which configures the
# initial runtime state of the child. # initial runtime state of the child.
sspec = SpawnSpec( sspec = msgtypes.SpawnSpec(
_parent_main_data=subactor._parent_main_data, _parent_main_data=subactor._parent_main_data,
enable_modules=subactor.enable_modules, enable_modules=subactor.enable_modules,
reg_addrs=subactor.reg_addrs, reg_addrs=subactor.reg_addrs,
bind_addrs=bind_addrs, bind_addrs=bind_addrs,
_runtime_vars=_runtime_vars, _runtime_vars=_runtime_vars,
) )
log.runtime(f'Sending spawn spec: {str(sspec)}') log.runtime(
f'Sending spawn spec to child\n'
f'{{}}=> {chan.aid.reprol()!r}\n'
f'\n'
f'{pretty_struct.pformat(sspec)}\n'
)
await chan.send(sspec) await chan.send(sspec)
# track subactor in current nursery # track subactor in current nursery
@ -563,7 +573,7 @@ async def trio_proc(
# condition. # condition.
await soft_kill( await soft_kill(
proc, proc,
trio.Process.wait, trio.Process.wait, # XXX, uses `pidfd_open()` below.
portal portal
) )
@ -571,8 +581,7 @@ async def trio_proc(
# tandem if not done already # tandem if not done already
log.cancel( log.cancel(
'Cancelling portal result reaper task\n' 'Cancelling portal result reaper task\n'
f'>c)\n' f'c)> {subactor.aid.reprol()!r}\n'
f' |_{subactor.uid}\n'
) )
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
@ -581,21 +590,24 @@ async def trio_proc(
# allowed! Do this **after** cancellation/teardown to avoid # allowed! Do this **after** cancellation/teardown to avoid
# killing the process too early. # killing the process too early.
if proc: if proc:
reap_repr: str = _pformat.nest_from_op(
input_op='>x)',
text=subactor.pformat(),
)
log.cancel( log.cancel(
f'Hard reap sequence starting for subactor\n' f'Hard reap sequence starting for subactor\n'
f'>x)\n' f'{reap_repr}'
f' |_{subactor}@{subactor.uid}\n'
) )
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
if cancelled_during_spawn: if cancelled_during_spawn:
# Try again to avoid TTY clobbering. # Try again to avoid TTY clobbering.
async with acquire_debug_lock(subactor.uid): async with debug.acquire_debug_lock(subactor.uid):
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
await proc.wait() await proc.wait()
await maybe_wait_for_debugger( await debug.maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get( child_in_debug=_runtime_vars.get(
'_debug_mode', False '_debug_mode', False
), ),
@ -624,7 +636,7 @@ async def trio_proc(
# acquire the lock and get notified of who has it, # acquire the lock and get notified of who has it,
# check that uid against our known children? # check that uid against our known children?
# this_uid: tuple[str, str] = current_actor().uid # this_uid: tuple[str, str] = current_actor().uid
# await acquire_debug_lock(this_uid) # await debug.acquire_debug_lock(this_uid)
if proc.poll() is None: if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}") log.cancel(f"Attempting to hard kill {proc}")
@ -727,7 +739,7 @@ async def mp_proc(
log.runtime(f"Started {proc}") log.runtime(f"Started {proc}")
ipc_server: IPCServer = actor_nursery._actor.ipc_server ipc_server: _server.Server = actor_nursery._actor.ipc_server
try: try:
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the

View File

@ -789,6 +789,11 @@ def open_shm_list(
readonly=readonly, readonly=readonly,
) )
# TODO, factor into a @actor_fixture acm-API?
# -[ ] also `@maybe_actor_fixture()` which inludes
# the .current_actor() convenience check?
# |_ orr can that just be in the sin-maybe-version?
#
# "close" attached shm on actor teardown # "close" attached shm on actor teardown
try: try:
actor = tractor.current_actor() actor = tractor.current_actor()

View File

@ -18,6 +18,7 @@ TCP implementation of tractor.ipc._transport.MsgTransport protocol
''' '''
from __future__ import annotations from __future__ import annotations
import ipaddress
from typing import ( from typing import (
ClassVar, ClassVar,
) )
@ -50,13 +51,45 @@ class TCPAddress(
_host: str _host: str
_port: int _port: int
def __post_init__(self):
try:
ipaddress.ip_address(self._host)
except ValueError as valerr:
raise ValueError(
'Invalid {type(self).__name__}._host = {self._host!r}\n'
) from valerr
proto_key: ClassVar[str] = 'tcp' proto_key: ClassVar[str] = 'tcp'
unwrapped_type: ClassVar[type] = tuple[str, int] unwrapped_type: ClassVar[type] = tuple[str, int]
def_bindspace: ClassVar[str] = '127.0.0.1' def_bindspace: ClassVar[str] = '127.0.0.1'
# ?TODO, actually validate ipv4/6 with stdlib's `ipaddress`
@property @property
def is_valid(self) -> bool: def is_valid(self) -> bool:
return self._port != 0 '''
Predicate to ensure a valid socket-address pair.
'''
return (
self._port != 0
and
(ipaddr := ipaddress.ip_address(self._host))
and not (
ipaddr.is_reserved
or
ipaddr.is_unspecified
or
ipaddr.is_link_local
or
ipaddr.is_link_local
or
ipaddr.is_multicast
or
ipaddr.is_global
)
)
# ^XXX^ see various properties of invalid addrs here,
# https://docs.python.org/3/library/ipaddress.html#ipaddress.IPv4Address
@property @property
def bindspace(self) -> str: def bindspace(self) -> str:

View File

@ -210,12 +210,14 @@ class PldRx(Struct):
match msg: match msg:
case Return()|Error(): case Return()|Error():
log.runtime( log.runtime(
f'Rxed final outcome msg\n' f'Rxed final-outcome msg\n'
f'\n'
f'{msg}\n' f'{msg}\n'
) )
case Stop(): case Stop():
log.runtime( log.runtime(
f'Rxed stream stopped msg\n' f'Rxed stream stopped msg\n'
f'\n'
f'{msg}\n' f'{msg}\n'
) )
if passthrough_non_pld_msgs: if passthrough_non_pld_msgs:
@ -261,8 +263,9 @@ class PldRx(Struct):
if ( if (
type(msg) is Return type(msg) is Return
): ):
log.info( log.runtime(
f'Rxed final result msg\n' f'Rxed final result msg\n'
f'\n'
f'{msg}\n' f'{msg}\n'
) )
return self.decode_pld( return self.decode_pld(
@ -304,10 +307,13 @@ class PldRx(Struct):
try: try:
pld: PayloadT = self._pld_dec.decode(pld) pld: PayloadT = self._pld_dec.decode(pld)
log.runtime( log.runtime(
'Decoded msg payload\n\n' f'Decoded payload for\n'
# f'\n'
f'{msg}\n' f'{msg}\n'
f'where payload decoded as\n' # ^TODO?, ideally just render with `,
f'|_pld={pld!r}\n' # pld={decode}` in the `msg.pformat()`??
f'where, '
f'{type(msg).__name__}.pld={pld!r}\n'
) )
return pld return pld
except TypeError as typerr: except TypeError as typerr:
@ -494,7 +500,8 @@ def limit_plds(
finally: finally:
log.runtime( log.runtime(
'Reverted to previous payload-decoder\n\n' f'Reverted to previous payload-decoder\n'
f'\n'
f'{orig_pldec}\n' f'{orig_pldec}\n'
) )
# sanity on orig settings # sanity on orig settings
@ -629,7 +636,8 @@ async def drain_to_final_msg(
(local_cs := rent_n.cancel_scope).cancel_called (local_cs := rent_n.cancel_scope).cancel_called
): ):
log.cancel( log.cancel(
'RPC-ctx cancelled by local-parent scope during drain!\n\n' f'RPC-ctx cancelled by local-parent scope during drain!\n'
f'\n'
f'c}}>\n' f'c}}>\n'
f' |_{rent_n}\n' f' |_{rent_n}\n'
f' |_.cancel_scope = {local_cs}\n' f' |_.cancel_scope = {local_cs}\n'
@ -663,7 +671,8 @@ async def drain_to_final_msg(
# final result arrived! # final result arrived!
case Return(): case Return():
log.runtime( log.runtime(
'Context delivered final draining msg:\n' f'Context delivered final draining msg\n'
f'\n'
f'{pretty_struct.pformat(msg)}' f'{pretty_struct.pformat(msg)}'
) )
ctx._result: Any = pld ctx._result: Any = pld
@ -697,12 +706,14 @@ async def drain_to_final_msg(
): ):
log.cancel( log.cancel(
'Cancelling `MsgStream` drain since ' 'Cancelling `MsgStream` drain since '
f'{reason}\n\n' f'{reason}\n'
f'\n'
f'<= {ctx.chan.uid}\n' f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n' f' |_{ctx._nsf}()\n'
f'\n'
f'=> {ctx._task}\n' f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n' f' |_{ctx._stream}\n'
f'\n'
f'{pretty_struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
break break
@ -739,7 +750,8 @@ async def drain_to_final_msg(
case Stop(): case Stop():
pre_result_drained.append(msg) pre_result_drained.append(msg)
log.runtime( # normal/expected shutdown transaction log.runtime( # normal/expected shutdown transaction
'Remote stream terminated due to "stop" msg:\n\n' f'Remote stream terminated due to "stop" msg\n'
f'\n'
f'{pretty_struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
continue continue
@ -814,7 +826,8 @@ async def drain_to_final_msg(
else: else:
log.cancel( log.cancel(
'Skipping `MsgStream` drain since final outcome is set\n\n' f'Skipping `MsgStream` drain since final outcome is set\n'
f'\n'
f'{ctx.outcome}\n' f'{ctx.outcome}\n'
) )