Compare commits

...

9 Commits

Author SHA1 Message Date
Tyler Goodlet 4a9e8731f1 More prep-to-reduce the `Actor` method-iface
- drop the (never/un)used `.get_chans()`.
- add #TODO for factoring many methods into a new `.rpc`-subsys/pkg
  primitive, like an `RPCMngr/Server` type eventually.
- add todo to maybe mv `.get_parent()` elsewhere?
- move masked `._hard_mofo_kill()` to bottom.
2025-07-08 12:44:46 -04:00
Tyler Goodlet b6ffaea62f Mk `.ipc._tcp.TCPAddress` validate with `ipaddress`
Both via a post-init method to validate the original input `._host: str`
and in `.is_valid` to ensure the host-part isn't something, esoteric..
2025-07-08 12:42:34 -04:00
Tyler Goodlet 63bf967598 Add an `enable_transports` test-suite
Like it sounds, verifying that when that param is passed to the runtime
startup eps (`.open_root_actor()/.open_nursery()`), the appropriate
tpt-protocol is deployed for IPC (both the server and bound endpoints)
in both the root and any sub-actors (as passed down from rent to child
via the `.msg.types.SpawnSpec`).
2025-07-08 12:40:55 -04:00
Tyler Goodlet 011d033a12 Add `.ipc._shm` todo-idea for `@actor_fixture` API 2025-07-08 11:07:50 -04:00
Tyler Goodlet 76fb80fda6 Update buncha log msg fmting in `.msg._ops`
Mostly just multi-line code styling again: always putting standalone
`'f\n'` on separate LOC so it reads like it renders to console. Oh and
and a level drop to `.runtime()` for rx-msg reports.
2025-07-08 11:01:44 -04:00
Tyler Goodlet d50f7ba9ca Couple more `._root` logging tweaks.. 2025-07-07 23:13:14 -04:00
Tyler Goodlet 65ae2dc67c Update buncha log msg fmting in `._spawn`
Again using `Channel.aid.reprol()`, `.devx.pformat.nest_from_op()` and
 converting to multi-line code style an ' for str-report-contents. Tweak
 some imports to sub-mod level as well.
2025-07-07 19:11:01 -04:00
Tyler Goodlet 4be499fb1f Update buncha log msg fmting in `._portal`
Namely to use `Channel.aid.reprol()` and converting to our newer style
multi-line code style for str-reports.
2025-07-07 15:46:53 -04:00
Tyler Goodlet 7317bb269c Use `._supervise._shutdown_msg` in tooling test 2025-07-07 14:31:34 -04:00
9 changed files with 289 additions and 99 deletions

View File

@ -116,9 +116,11 @@ def test_shield_pause(
child.pid,
signal.SIGINT,
)
from tractor._supervise import _shutdown_msg
expect(
child,
'Shutting down actor runtime',
# 'Shutting down actor runtime',
_shutdown_msg,
timeout=6,
)
assert_before(

View File

@ -0,0 +1,95 @@
'''
Verify the `enable_transports` param drives various
per-root/sub-actor IPC endpoint/server settings.
'''
from __future__ import annotations
import pytest
import trio
import tractor
from tractor import (
Actor,
Portal,
ipc,
msg,
_state,
_addr,
)
@tractor.context
async def chk_tpts(
ctx: tractor.Context,
tpt_proto_key: str,
):
rtvars = _state._runtime_vars
assert (
tpt_proto_key
in
rtvars['_enable_tpts']
)
actor: Actor = tractor.current_actor()
spec: msg.types.SpawnSpec = actor._spawn_spec
assert spec._runtime_vars == rtvars
# ensure individual IPC ep-addr types
serv: ipc._server.Server = actor.ipc_server
addr: ipc._types.Address
for addr in serv.addrs:
assert addr.proto_key == tpt_proto_key
# Actor delegate-props enforcement
assert (
actor.accept_addrs
==
serv.accept_addrs
)
await ctx.started(serv.accept_addrs)
# TODO, parametrize over mis-matched-proto-typed `registry_addrs`
# since i seems to work in `piker` but not exactly sure if both tcp
# & uds are being deployed then?
#
@pytest.mark.parametrize(
'tpt_proto_key',
['tcp', 'uds'],
ids=lambda item: f'ipc_tpt={item!r}'
)
def test_root_passes_tpt_to_sub(
tpt_proto_key: str,
reg_addr: tuple,
debug_mode: bool,
):
async def main():
async with tractor.open_nursery(
enable_transports=[tpt_proto_key],
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as an:
assert (
tpt_proto_key
in
_state._runtime_vars['_enable_tpts']
)
ptl: Portal = await an.start_actor(
name='sub',
enable_modules=[__name__],
)
async with ptl.open_context(
chk_tpts,
tpt_proto_key=tpt_proto_key,
) as (ctx, accept_addrs):
uw_addr: tuple
for uw_addr in accept_addrs:
addr = _addr.wrap_address(uw_addr)
assert addr.is_valid
# shudown sub-actor(s)
await an.cancel()
trio.run(main)

View File

@ -118,6 +118,10 @@ class Portal:
@property
def chan(self) -> Channel:
'''
Ref to this ctx's underlying `tractor.ipc.Channel`.
'''
return self._chan
@property
@ -177,10 +181,17 @@ class Portal:
# not expecting a "main" result
if self._expect_result_ctx is None:
peer_id: str = f'{self.channel.aid.reprol()!r}'
log.warning(
f"Portal for {self.channel.aid} not expecting a final"
" result?\nresult() should only be called if subactor"
" was spawned with `ActorNursery.run_in_actor()`")
f'Portal to peer {peer_id} will not deliver a final result?\n'
f'\n'
f'Context.result() can only be called by the parent of '
f'a sub-actor when it was spawned with '
f'`ActorNursery.run_in_actor()`'
f'\n'
f'Further this `ActorNursery`-method-API will deprecated in the'
f'near fututre!\n'
)
return NoResult
# expecting a "main" result
@ -213,6 +224,7 @@ class Portal:
typname: str = type(self).__name__
log.warning(
f'`{typname}.result()` is DEPRECATED!\n'
f'\n'
f'Use `{typname}.wait_for_result()` instead!\n'
)
return await self.wait_for_result(
@ -224,8 +236,10 @@ class Portal:
# terminate all locally running async generator
# IPC calls
if self._streams:
log.cancel(
f"Cancelling all streams with {self.channel.aid}")
peer_id: str = f'{self.channel.aid.reprol()!r}'
report: str = (
f'Cancelling all msg-streams with {peer_id}\n'
)
for stream in self._streams.copy():
try:
await stream.aclose()
@ -234,10 +248,18 @@ class Portal:
# (unless of course at some point down the road we
# won't expect this to always be the case or need to
# detect it for respawning purposes?)
log.debug(f"{stream} was already closed.")
report += (
f'->) {stream!r} already closed\n'
)
log.cancel(report)
async def aclose(self):
log.debug(f"Closing {self}")
log.debug(
f'Closing portal\n'
f'>}}\n'
f'|_{self}\n'
)
# TODO: once we move to implementing our own `ReceiveChannel`
# (including remote task cancellation inside its `.aclose()`)
# we'll need to .aclose all those channels here
@ -263,19 +285,18 @@ class Portal:
__runtimeframe__: int = 1 # noqa
chan: Channel = self.channel
peer_id: str = f'{self.channel.aid.reprol()!r}'
if not chan.connected():
log.runtime(
'This channel is already closed, skipping cancel request..'
'Peer {peer_id} is already disconnected\n'
'-> skipping cancel request..\n'
)
return False
reminfo: str = (
f'c)=> {self.channel.aid}\n'
f' |_{chan}\n'
)
log.cancel(
f'Requesting actor-runtime cancel for peer\n\n'
f'{reminfo}'
f'Sending actor-runtime-cancel-req to peer\n'
f'\n'
f'c)=> {peer_id}\n'
)
# XXX the one spot we set it?
@ -300,8 +321,9 @@ class Portal:
# may timeout and we never get an ack (obvi racy)
# but that doesn't mean it wasn't cancelled.
log.debug(
'May have failed to cancel peer?\n'
f'{reminfo}'
f'May have failed to cancel peer?\n'
f'\n'
f'c)=?> {peer_id}\n'
)
# if we get here some weird cancellation case happened
@ -319,22 +341,22 @@ class Portal:
TransportClosed,
) as tpt_err:
report: str = (
f'IPC chan for actor already closed or broken?\n\n'
f'{self.channel.aid}\n'
f' |_{self.channel}\n'
ipc_borked_report: str = (
f'IPC for actor already closed/broken?\n\n'
f'\n'
f'c)=x> {peer_id}\n'
)
match tpt_err:
case TransportClosed():
log.debug(report)
log.debug(ipc_borked_report)
case _:
report += (
ipc_borked_report += (
f'\n'
f'Unhandled low-level transport-closed/error during\n'
f'Portal.cancel_actor()` request?\n'
f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
)
log.warning(report)
log.warning(ipc_borked_report)
return False
@ -491,10 +513,13 @@ class Portal:
with trio.CancelScope(shield=True):
await ctx.cancel()
except trio.ClosedResourceError:
except trio.ClosedResourceError as cre:
# if the far end terminates before we send a cancel the
# underlying transport-channel may already be closed.
log.cancel(f'Context {ctx} was already closed?')
log.cancel(
f'Context.cancel() -> {cre!r}\n'
f'cid: {ctx.cid!r} already closed?\n'
)
# XXX: should this always be done?
# await recv_chan.aclose()

View File

@ -97,7 +97,7 @@ async def maybe_block_bp(
):
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n'
f'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx.debug._sync_pause_from_builtin'
@ -471,7 +471,7 @@ async def open_root_actor(
'-> Opening new registry @ '
+
'\n'.join(
f'@{addr}' for addr in reg_addrs
f'{addr}' for addr in reg_addrs
)
)
logger.info(f'{report}\n')
@ -543,7 +543,7 @@ async def open_root_actor(
raise
finally:
# NOTE: not sure if we'll ever need this but it's
# NOTE/TODO?, not sure if we'll ever need this but it's
# possibly better for even more determinism?
# logger.cancel(
# f'Waiting on {len(nurseries)} nurseries in root..')

View File

@ -552,6 +552,14 @@ class Actor:
)
raise
# ?TODO, factor this meth-iface into a new `.rpc` subsys primitive?
# - _get_rpc_func(),
# - _deliver_ctx_payload(),
# - get_context(),
# - start_remote_task(),
# - cancel_rpc_tasks(),
# - _cancel_task(),
#
def _get_rpc_func(self, ns, funcname):
'''
Try to lookup and return a target RPC func from the
@ -1119,14 +1127,6 @@ class Actor:
self._cancel_complete.set()
return True
# XXX: hard kill logic if needed?
# def _hard_mofo_kill(self):
# # If we're the root actor or zombied kill everything
# if self._parent_chan is None: # TODO: more robust check
# root = trio.lowlevel.current_root_task()
# for n in root.child_nurseries:
# n.cancel_scope.cancel()
async def _cancel_task(
self,
cid: str,
@ -1361,25 +1361,13 @@ class Actor:
'''
return self.accept_addrs[0]
def get_parent(self) -> Portal:
'''
Return a `Portal` to our parent.
'''
assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan)
def get_chans(
self,
uid: tuple[str, str],
) -> list[Channel]:
'''
Return all IPC channels to the actor with provided `uid`.
'''
return self._ipc_server._peers[uid]
# TODO, this should delegate ONLY to the
# `._spawn_spec._runtime_vars: dict` / `._state` APIs?
#
# XXX, AH RIGHT that's why..
# it's bc we pass this as a CLI flag to the child.py precisely
# bc we need the bootstrapping pre `async_main()`.. but maybe
# keep this as an impl deat and not part of the pub iface impl?
def is_infected_aio(self) -> bool:
'''
If `True`, this actor is running `trio` in guest mode on
@ -1390,6 +1378,23 @@ class Actor:
'''
return self._infected_aio
# ?TODO, is this the right type for this method?
def get_parent(self) -> Portal:
'''
Return a `Portal` to our parent.
'''
assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan)
# XXX: hard kill logic if needed?
# def _hard_mofo_kill(self):
# # If we're the root actor or zombied kill everything
# if self._parent_chan is None: # TODO: more robust check
# root = trio.lowlevel.current_root_task()
# for n in root.child_nurseries:
# n.cancel_scope.cancel()
async def async_main(
actor: Actor,

View File

@ -34,9 +34,9 @@ from typing import (
import trio
from trio import TaskStatus
from .devx.debug import (
maybe_wait_for_debugger,
acquire_debug_lock,
from .devx import (
debug,
pformat as _pformat
)
from tractor._state import (
current_actor,
@ -51,14 +51,17 @@ from tractor._portal import Portal
from tractor._runtime import Actor
from tractor._entry import _mp_main
from tractor._exceptions import ActorFailure
from tractor.msg.types import (
Aid,
SpawnSpec,
from tractor.msg import (
types as msgtypes,
pretty_struct,
)
if TYPE_CHECKING:
from ipc import IPCServer
from ipc import (
_server,
Channel,
)
from ._supervise import ActorNursery
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
@ -328,20 +331,21 @@ async def soft_kill(
see `.hard_kill()`).
'''
peer_aid: Aid = portal.channel.aid
chan: Channel = portal.channel
peer_aid: msgtypes.Aid = chan.aid
try:
log.cancel(
f'Soft killing sub-actor via portal request\n'
f'\n'
f'(c=> {peer_aid}\n'
f' |_{proc}\n'
f'c)=> {peer_aid.reprol()}@[{chan.maddr}]\n'
f' |_{proc}\n'
)
# wait on sub-proc to signal termination
await wait_func(proc)
except trio.Cancelled:
with trio.CancelScope(shield=True):
await maybe_wait_for_debugger(
await debug.maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False
),
@ -465,7 +469,7 @@ async def trio_proc(
"--uid",
# TODO, how to pass this over "wire" encodings like
# cmdline args?
# -[ ] maybe we can add an `Aid.min_tuple()` ?
# -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
str(subactor.uid),
# Address the child must connect to on startup
"--parent_addr",
@ -483,13 +487,14 @@ async def trio_proc(
cancelled_during_spawn: bool = False
proc: trio.Process|None = None
ipc_server: IPCServer = actor_nursery._actor.ipc_server
ipc_server: _server.Server = actor_nursery._actor.ipc_server
try:
try:
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
log.runtime(
'Started new child\n'
f'|_{proc}\n'
f'Started new child subproc\n'
f'(>\n'
f' |_{proc}\n'
)
# wait for actor to spawn and connect back to us
@ -507,10 +512,10 @@ async def trio_proc(
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if is_root_process():
await maybe_wait_for_debugger()
await debug.maybe_wait_for_debugger()
elif proc is not None:
async with acquire_debug_lock(subactor.uid):
async with debug.acquire_debug_lock(subactor.uid):
# soft wait on the proc to terminate
with trio.move_on_after(0.5):
await proc.wait()
@ -528,14 +533,19 @@ async def trio_proc(
# send a "spawning specification" which configures the
# initial runtime state of the child.
sspec = SpawnSpec(
sspec = msgtypes.SpawnSpec(
_parent_main_data=subactor._parent_main_data,
enable_modules=subactor.enable_modules,
reg_addrs=subactor.reg_addrs,
bind_addrs=bind_addrs,
_runtime_vars=_runtime_vars,
)
log.runtime(f'Sending spawn spec: {str(sspec)}')
log.runtime(
f'Sending spawn spec to child\n'
f'{{}}=> {chan.aid.reprol()!r}\n'
f'\n'
f'{pretty_struct.pformat(sspec)}\n'
)
await chan.send(sspec)
# track subactor in current nursery
@ -563,7 +573,7 @@ async def trio_proc(
# condition.
await soft_kill(
proc,
trio.Process.wait,
trio.Process.wait, # XXX, uses `pidfd_open()` below.
portal
)
@ -571,8 +581,7 @@ async def trio_proc(
# tandem if not done already
log.cancel(
'Cancelling portal result reaper task\n'
f'>c)\n'
f' |_{subactor.uid}\n'
f'c)> {subactor.aid.reprol()!r}\n'
)
nursery.cancel_scope.cancel()
@ -581,21 +590,24 @@ async def trio_proc(
# allowed! Do this **after** cancellation/teardown to avoid
# killing the process too early.
if proc:
reap_repr: str = _pformat.nest_from_op(
input_op='>x)',
text=subactor.pformat(),
)
log.cancel(
f'Hard reap sequence starting for subactor\n'
f'>x)\n'
f' |_{subactor}@{subactor.uid}\n'
f'{reap_repr}'
)
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if cancelled_during_spawn:
# Try again to avoid TTY clobbering.
async with acquire_debug_lock(subactor.uid):
async with debug.acquire_debug_lock(subactor.uid):
with trio.move_on_after(0.5):
await proc.wait()
await maybe_wait_for_debugger(
await debug.maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False
),
@ -624,7 +636,7 @@ async def trio_proc(
# acquire the lock and get notified of who has it,
# check that uid against our known children?
# this_uid: tuple[str, str] = current_actor().uid
# await acquire_debug_lock(this_uid)
# await debug.acquire_debug_lock(this_uid)
if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}")
@ -727,7 +739,7 @@ async def mp_proc(
log.runtime(f"Started {proc}")
ipc_server: IPCServer = actor_nursery._actor.ipc_server
ipc_server: _server.Server = actor_nursery._actor.ipc_server
try:
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the

View File

@ -789,6 +789,11 @@ def open_shm_list(
readonly=readonly,
)
# TODO, factor into a @actor_fixture acm-API?
# -[ ] also `@maybe_actor_fixture()` which inludes
# the .current_actor() convenience check?
# |_ orr can that just be in the sin-maybe-version?
#
# "close" attached shm on actor teardown
try:
actor = tractor.current_actor()

View File

@ -18,6 +18,7 @@ TCP implementation of tractor.ipc._transport.MsgTransport protocol
'''
from __future__ import annotations
import ipaddress
from typing import (
ClassVar,
)
@ -50,13 +51,45 @@ class TCPAddress(
_host: str
_port: int
def __post_init__(self):
try:
ipaddress.ip_address(self._host)
except ValueError as valerr:
raise ValueError(
'Invalid {type(self).__name__}._host = {self._host!r}\n'
) from valerr
proto_key: ClassVar[str] = 'tcp'
unwrapped_type: ClassVar[type] = tuple[str, int]
def_bindspace: ClassVar[str] = '127.0.0.1'
# ?TODO, actually validate ipv4/6 with stdlib's `ipaddress`
@property
def is_valid(self) -> bool:
return self._port != 0
'''
Predicate to ensure a valid socket-address pair.
'''
return (
self._port != 0
and
(ipaddr := ipaddress.ip_address(self._host))
and not (
ipaddr.is_reserved
or
ipaddr.is_unspecified
or
ipaddr.is_link_local
or
ipaddr.is_link_local
or
ipaddr.is_multicast
or
ipaddr.is_global
)
)
# ^XXX^ see various properties of invalid addrs here,
# https://docs.python.org/3/library/ipaddress.html#ipaddress.IPv4Address
@property
def bindspace(self) -> str:

View File

@ -210,12 +210,14 @@ class PldRx(Struct):
match msg:
case Return()|Error():
log.runtime(
f'Rxed final outcome msg\n'
f'Rxed final-outcome msg\n'
f'\n'
f'{msg}\n'
)
case Stop():
log.runtime(
f'Rxed stream stopped msg\n'
f'\n'
f'{msg}\n'
)
if passthrough_non_pld_msgs:
@ -261,8 +263,9 @@ class PldRx(Struct):
if (
type(msg) is Return
):
log.info(
log.runtime(
f'Rxed final result msg\n'
f'\n'
f'{msg}\n'
)
return self.decode_pld(
@ -304,10 +307,13 @@ class PldRx(Struct):
try:
pld: PayloadT = self._pld_dec.decode(pld)
log.runtime(
'Decoded msg payload\n\n'
f'Decoded payload for\n'
# f'\n'
f'{msg}\n'
f'where payload decoded as\n'
f'|_pld={pld!r}\n'
# ^TODO?, ideally just render with `,
# pld={decode}` in the `msg.pformat()`??
f'where, '
f'{type(msg).__name__}.pld={pld!r}\n'
)
return pld
except TypeError as typerr:
@ -494,7 +500,8 @@ def limit_plds(
finally:
log.runtime(
'Reverted to previous payload-decoder\n\n'
f'Reverted to previous payload-decoder\n'
f'\n'
f'{orig_pldec}\n'
)
# sanity on orig settings
@ -629,7 +636,8 @@ async def drain_to_final_msg(
(local_cs := rent_n.cancel_scope).cancel_called
):
log.cancel(
'RPC-ctx cancelled by local-parent scope during drain!\n\n'
f'RPC-ctx cancelled by local-parent scope during drain!\n'
f'\n'
f'c}}>\n'
f' |_{rent_n}\n'
f' |_.cancel_scope = {local_cs}\n'
@ -663,7 +671,8 @@ async def drain_to_final_msg(
# final result arrived!
case Return():
log.runtime(
'Context delivered final draining msg:\n'
f'Context delivered final draining msg\n'
f'\n'
f'{pretty_struct.pformat(msg)}'
)
ctx._result: Any = pld
@ -697,12 +706,14 @@ async def drain_to_final_msg(
):
log.cancel(
'Cancelling `MsgStream` drain since '
f'{reason}\n\n'
f'{reason}\n'
f'\n'
f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n'
f' |_{ctx._nsf}()\n'
f'\n'
f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n'
f' |_{ctx._stream}\n'
f'\n'
f'{pretty_struct.pformat(msg)}\n'
)
break
@ -739,7 +750,8 @@ async def drain_to_final_msg(
case Stop():
pre_result_drained.append(msg)
log.runtime( # normal/expected shutdown transaction
'Remote stream terminated due to "stop" msg:\n\n'
f'Remote stream terminated due to "stop" msg\n'
f'\n'
f'{pretty_struct.pformat(msg)}\n'
)
continue
@ -814,7 +826,8 @@ async def drain_to_final_msg(
else:
log.cancel(
'Skipping `MsgStream` drain since final outcome is set\n\n'
f'Skipping `MsgStream` drain since final outcome is set\n'
f'\n'
f'{ctx.outcome}\n'
)