Compare commits

..

5 Commits

Author SHA1 Message Date
Tyler Goodlet 29db08b370 Use `nest_from_op()`/`pretty_struct` in `._rpc`
Again for nicer console logging. Also fix a double `req_chan` arg bug
when passed to `_invoke` in the `self.cancel()` rt-ep; don't update the
`kwargs: dict` just merge in `req_chan` input at call time.
2025-07-07 11:02:47 -04:00
Tyler Goodlet fe5e6e2ab0 Use `nest_from_op()` in actor-nursery shutdown
Including a new one-line `_shutdown_msg: str` which we mod-var-set for
testing usage and some denoising at `.info()` level. Adjust `Actor()`
instantiating input to the new `.registry_addrs` wrapped addrs property.
2025-07-07 10:59:00 -04:00
Tyler Goodlet ae91310b32 Use `Address` where possible in (root) actor boot
Namely inside various bootup-sequences in `._root` and `._runtime`
particularly in the root actor to support both better tpt-address
denoting in our logging and as part of clarifying logic around setting
the root's registry addresses which is soon to be much better factored
out of the core and into an explicit subsystem + API.

Some `_root.open_root_actor()` deats,
- set `registry_addrs` to a new `uw_reg_addrs` (uw: unwrapped) to be
  more explicit about wrapped addr types thoughout.
- instead ensure `registry_addrs` are the wrapped types and pass down
  into the root `Actor` singleton-instance.
- factor the root-actor check + rt-vars update (updating the `'_root_addrs'`)
  out of `._runtime.async_main()` into this fn.
- as previous, set `trans_bind_addrs = uw_reg_addrs` in unwrapped form since it will
  be passed down both through rt-vars as `'_root_addrs'` and to
  `._runtim.async_main()` as `accept_addrs` (which is then passed to the
  IPC server).
- adjust/simplify much logging.
- shield the `await actor.cancel(None)  # self cancel` to avoid any
  finally-footguns.
- as mentioned convert the

For `_runtime.async_main()` tweaks,
- expect `registry_addrs: list[Address]|None = None` with appropriate
  unwrapping prior to setting both `.reg_addrs` and the equiv rt-var.
- add a new `.registry_addrs` prop for the wrapped form.
- convert a final loose-eg for the `service_nursery` to use
  `collapse_eg()`.
- simplify teardown report logging.
2025-07-07 10:55:57 -04:00
Tyler Goodlet f86f4ae48d Facepalm, fix `raise from` in `collapse_eg()`
I dunno what exactly I was thinking but we definitely don't want to
**ever** raise from the original exc-group, instead always raise from
any original `.__cause__` to be consistent with the embedded src-error's
context.

Also, adjust `maybe_collapse_eg()` to return `False` in the non-single
`.exceptions` case, again don't know what I was trying to do but this
simplifies caller logic and the prior return-semantic had no real
value..

This fixes some final usage in the runtime (namely top level nursery
usage in `._root`/`._runtime`) which was previously causing test suite
failures prior to this fix.
2025-07-07 10:06:59 -04:00
Tyler Goodlet b244cf844d Add #TODO for `._context` to use `.msg.Aid` 2025-07-04 15:01:29 -04:00
6 changed files with 179 additions and 101 deletions

View File

@ -743,6 +743,8 @@ class Context:
# cancelled, NOT their reported canceller. IOW in the
# latter case we're cancelled by someone else getting
# cancelled.
#
# !TODO, switching to `Actor.aid` here!
if (canc := error.canceller) == self._actor.uid:
whom: str = 'us'
self._canceller = canc

View File

@ -202,7 +202,9 @@ async def open_root_actor(
'''
# XXX NEVER allow nested actor-trees!
if already_actor := _state.current_actor(err_on_no_runtime=False):
if already_actor := _state.current_actor(
err_on_no_runtime=False,
):
rtvs: dict[str, Any] = _state._runtime_vars
root_mailbox: list[str, int] = rtvs['_root_mailbox']
registry_addrs: list[list[str, int]] = rtvs['_registry_addrs']
@ -272,14 +274,20 @@ async def open_root_actor(
DeprecationWarning,
stacklevel=2,
)
registry_addrs = [arbiter_addr]
uw_reg_addrs = [arbiter_addr]
if not registry_addrs:
registry_addrs: list[UnwrappedAddress] = default_lo_addrs(
uw_reg_addrs = registry_addrs
if not uw_reg_addrs:
uw_reg_addrs: list[UnwrappedAddress] = default_lo_addrs(
enable_transports
)
assert registry_addrs
# must exist by now since all below code is dependent
assert uw_reg_addrs
registry_addrs: list[Address] = [
wrap_address(uw_addr)
for uw_addr in uw_reg_addrs
]
loglevel = (
loglevel
@ -328,10 +336,10 @@ async def open_root_actor(
enable_stack_on_sig()
# closed into below ping task-func
ponged_addrs: list[UnwrappedAddress] = []
ponged_addrs: list[Address] = []
async def ping_tpt_socket(
addr: UnwrappedAddress,
addr: Address,
timeout: float = 1,
) -> None:
'''
@ -351,17 +359,22 @@ async def open_root_actor(
# be better to eventually have a "discovery" protocol
# with basic handshake instead?
with trio.move_on_after(timeout):
async with _connect_chan(addr):
async with _connect_chan(addr.unwrap()):
ponged_addrs.append(addr)
except OSError:
# TODO: make this a "discovery" log level?
# ?TODO, make this a "discovery" log level?
logger.info(
f'No actor registry found @ {addr}\n'
f'No root-actor registry found @ {addr!r}\n'
)
# !TODO, this is basically just another (abstract)
# happy-eyeballs, so we should try for formalize it somewhere
# in a `.[_]discovery` ya?
#
async with trio.open_nursery() as tn:
for addr in registry_addrs:
for uw_addr in uw_reg_addrs:
addr: Address = wrap_address(uw_addr)
tn.start_soon(
ping_tpt_socket,
addr,
@ -390,24 +403,28 @@ async def open_root_actor(
loglevel=loglevel,
enable_modules=enable_modules,
)
# DO NOT use the registry_addrs as the transport server
# addrs for this new non-registar, root-actor.
# **DO NOT** use the registry_addrs as the
# ipc-transport-server's bind-addrs as this is
# a new NON-registrar, ROOT-actor.
#
# XXX INSTEAD, bind random addrs using the same tpt
# proto.
for addr in ponged_addrs:
waddr: Address = wrap_address(addr)
trans_bind_addrs.append(
waddr.get_random(bindspace=waddr.bindspace)
addr.get_random(
bindspace=addr.bindspace,
)
)
# Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors.
else:
# NOTE that if the current actor IS THE REGISTAR, the
# following init steps are taken:
# - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default.
trans_bind_addrs = registry_addrs
trans_bind_addrs = uw_reg_addrs
# - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub)
@ -431,6 +448,16 @@ async def open_root_actor(
# `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# NOTE, only set the loopback addr for the
# process-tree-global "root" mailbox since all sub-actors
# should be able to speak to their root actor over that
# channel.
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
raddrs.extend(trans_bind_addrs)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Start up main task set via core actor-runtime nurseries.
try:
# assign process-local actor
@ -438,20 +465,26 @@ async def open_root_actor(
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
ml_addrs_str: str = '\n'.join(
f'@{addr}' for addr in trans_bind_addrs
)
logger.info(
f'Starting local {actor.uid} on the following transport addrs:\n'
f'{ml_addrs_str}'
)
report: str = f'Starting actor-runtime for {actor.aid.reprol()!r}\n'
if reg_addrs := actor.registry_addrs:
report += (
'-> Opening new registry @ '
+
'\n'.join(
f'@{addr}' for addr in reg_addrs
)
)
logger.info(f'{report}\n')
# start runtime in a bg sub-task, yield to caller.
async with (
collapse_eg(),
trio.open_nursery() as root_tn,
):
# XXX, finally-footgun below?
# -> see note on why shielding.
# maybe_raise_from_masking_exc(),
):
# `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called
@ -526,9 +559,14 @@ async def open_root_actor(
)
logger.info(
f'Closing down root actor\n'
f'{op_nested_actor_repr}\n'
f'{op_nested_actor_repr}'
)
await actor.cancel(None) # self cancel
# XXX, THIS IS A *finally-footgun*!
# -> though already shields iternally it can
# taskc here and mask underlying errors raised in
# the try-block above?
with trio.CancelScope(shield=True):
await actor.cancel(None) # self cancel
finally:
# revert all process-global runtime state
if (
@ -541,10 +579,16 @@ async def open_root_actor(
_state._current_actor = None
_state._last_actor_terminated = actor
logger.runtime(
sclang_repr: str = _pformat.nest_from_op(
input_op=')>',
text=actor.pformat(),
nest_prefix='|_',
nest_indent=1,
)
logger.info(
f'Root actor terminated\n'
f')>\n'
f' |_{actor}\n'
f'{sclang_repr}'
)

View File

@ -672,7 +672,8 @@ async def _invoke(
ctx._result = res
log.runtime(
f'Sending result msg and exiting {ctx.side!r}\n'
f'{return_msg}\n'
f'\n'
f'{pretty_struct.pformat(return_msg)}\n'
)
await chan.send(return_msg)
@ -839,12 +840,12 @@ async def _invoke(
else:
descr_str += f'\n{merr!r}\n'
else:
descr_str += f'\nand final result {ctx.outcome!r}\n'
descr_str += f'\nwith final result {ctx.outcome!r}\n'
logmeth(
message
+
descr_str
f'{message}\n'
f'\n'
f'{descr_str}\n'
)
@ -1011,8 +1012,6 @@ async def process_messages(
cid=cid,
kwargs=kwargs,
):
kwargs |= {'req_chan': chan}
# XXX NOTE XXX don't start entire actor
# runtime cancellation if this actor is
# currently in debug mode!
@ -1031,14 +1030,14 @@ async def process_messages(
cid,
chan,
actor.cancel,
kwargs,
kwargs | {'req_chan': chan},
is_rpc=False,
return_msg_type=CancelAck,
)
log.runtime(
'Cancelling IPC transport msg-loop with peer:\n'
f'|_{chan}\n'
'Cancelling RPC-msg-loop with peer\n'
f'->c}} {chan.aid.reprol()}@[{chan.maddr}]\n'
)
loop_cs.cancel()
break
@ -1234,9 +1233,21 @@ async def process_messages(
# END-OF `async for`:
# IPC disconnected via `trio.EndOfChannel`, likely
# due to a (graceful) `Channel.aclose()`.
chan_op_repr: str = '<=x] '
chan_repr: str = _pformat.nest_from_op(
input_op=chan_op_repr,
op_suffix='',
nest_prefix='',
text=chan.pformat(),
nest_indent=len(chan_op_repr)-1,
rm_from_first_ln='<',
)
log.runtime(
f'channel for {chan.uid} disconnected, cancelling RPC tasks\n'
f'|_{chan}\n'
f'IPC channel disconnected\n'
f'{chan_repr}\n'
f'\n'
f'->c) cancelling RPC tasks.\n'
)
await actor.cancel_rpc_tasks(
req_aid=actor.aid,

View File

@ -213,7 +213,7 @@ class Actor:
*,
enable_modules: list[str] = [],
loglevel: str|None = None,
registry_addrs: list[UnwrappedAddress]|None = None,
registry_addrs: list[Address]|None = None,
spawn_method: str|None = None,
# TODO: remove!
@ -256,11 +256,12 @@ class Actor:
if arbiter_addr is not None:
warnings.warn(
'`Actor(arbiter_addr=<blah>)` is now deprecated.\n'
'Use `registry_addrs: list[tuple]` instead.',
'Use `registry_addrs: list[Address]` instead.',
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[UnwrappedAddress] = [arbiter_addr]
registry_addrs: list[Address] = [wrap_address(arbiter_addr)]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
@ -299,8 +300,10 @@ class Actor:
# input via the validator.
self._reg_addrs: list[UnwrappedAddress] = []
if registry_addrs:
self.reg_addrs: list[UnwrappedAddress] = registry_addrs
_state._runtime_vars['_registry_addrs'] = registry_addrs
_state._runtime_vars['_registry_addrs'] = self.reg_addrs = [
addr.unwrap()
for addr in registry_addrs
]
@property
def aid(self) -> msgtypes.Aid:
@ -472,7 +475,11 @@ class Actor:
def reg_addrs(self) -> list[UnwrappedAddress]:
'''
List of (socket) addresses for all known (and contactable)
registry actors.
registry-service actors in "unwrapped" (i.e. IPC interchange
wire-compat) form.
If you are looking for the "wrapped" address form, use
`.registry_addrs` instead.
'''
return self._reg_addrs
@ -491,8 +498,14 @@ class Actor:
self._reg_addrs = addrs
@property
def registry_addrs(self) -> list[Address]:
return [wrap_address(uw_addr)
for uw_addr in self.reg_addrs]
def load_modules(
self,
) -> None:
'''
Load explicitly enabled python modules from local fs after
@ -1365,7 +1378,7 @@ class Actor:
Return all IPC channels to the actor with provided `uid`.
'''
return self._peers[uid]
return self._ipc_server._peers[uid]
def is_infected_aio(self) -> bool:
'''
@ -1420,6 +1433,8 @@ async def async_main(
# establish primary connection with immediate parent
actor._parent_chan: Channel|None = None
# is this a sub-actor?
# get runtime info from parent.
if parent_addr is not None:
(
actor._parent_chan,
@ -1464,10 +1479,8 @@ async def async_main(
ipc_server: _server.IPCServer
async with (
trio.open_nursery(
strict_exception_groups=False,
) as service_nursery,
collapse_eg(),
trio.open_nursery() as service_nursery,
_server.open_ipc_server(
parent_tn=service_nursery,
stream_handler_tn=service_nursery,
@ -1518,9 +1531,6 @@ async def async_main(
# TODO: why is this not with the root nursery?
try:
log.runtime(
'Booting IPC server'
)
eps: list = await ipc_server.listen_on(
accept_addrs=accept_addrs,
stream_handler_nursery=service_nursery,
@ -1552,18 +1562,6 @@ async def async_main(
# TODO, just read direct from ipc_server?
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
# NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since
# all sub-actors should be able to speak to
# their root actor over that channel.
if _state._runtime_vars['_is_root']:
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
for addr in accept_addrs:
waddr: Address = wrap_address(addr)
raddrs.append(addr)
else:
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Register with the arbiter if we're told its addr
log.runtime(
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
@ -1581,6 +1579,7 @@ async def async_main(
except AssertionError:
await debug.pause()
# !TODO, get rid of the local-portal crap XD
async with get_registry(addr) as reg_portal:
for accept_addr in accept_addrs:
accept_addr = wrap_address(accept_addr)
@ -1619,7 +1618,7 @@ async def async_main(
log.runtime(
'Service nursery complete\n'
'\n'
'-> Waiting on root nursery to complete'
'->} waiting on root nursery to complete..\n'
)
# Blocks here as expected until the root nursery is
@ -1674,6 +1673,7 @@ async def async_main(
finally:
teardown_report: str = (
'Main actor-runtime task completed\n'
'\n'
)
# ?TODO? should this be in `._entry`/`._root` mods instead?
@ -1715,7 +1715,8 @@ async def async_main(
# Unregister actor from the registry-sys / registrar.
if (
is_registered
and not actor.is_registrar
and
not actor.is_registrar
):
failed: bool = False
for addr in actor.reg_addrs:
@ -1750,7 +1751,8 @@ async def async_main(
ipc_server.has_peers(check_chans=True)
):
teardown_report += (
f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n'
f'-> Waiting for remaining peers to clear..\n'
f' {pformat(ipc_server._peers)}'
)
log.runtime(teardown_report)
await ipc_server.wait_for_no_more_peers(
@ -1758,20 +1760,23 @@ async def async_main(
)
teardown_report += (
'-> All peer channels are complete\n'
'-]> all peer channels are complete.\n'
)
op_nested_actor_repr: str = _pformat.nest_from_op(
input_op=')>',
text=actor.pformat(),
nest_prefix='|_',
nest_indent=1, # under >
)
# op_nested_actor_repr: str = _pformat.nest_from_op(
# input_op=')>',
# text=actor.pformat(),
# nest_prefix='|_',
# nest_indent=1, # under >
# )
teardown_report += (
'Actor runtime exited\n'
f'{op_nested_actor_repr}'
'-)> actor runtime main task exit.\n'
# f'{op_nested_actor_repr}'
)
log.info(teardown_report)
# if _state._runtime_vars['_is_root']:
# log.info(teardown_report)
# else:
log.runtime(teardown_report)
# TODO: rename to `Registry` and move to `.discovery._registry`!

View File

@ -21,7 +21,6 @@
from contextlib import asynccontextmanager as acm
from functools import partial
import inspect
from pprint import pformat
from typing import (
TYPE_CHECKING,
)
@ -31,7 +30,10 @@ import warnings
import trio
from .devx.debug import maybe_wait_for_debugger
from .devx import (
debug,
pformat as _pformat,
)
from ._addr import (
UnwrappedAddress,
mk_uuid,
@ -200,7 +202,7 @@ class ActorNursery:
loglevel=loglevel,
# verbatim relay this actor's registrar addresses
registry_addrs=current_actor().reg_addrs,
registry_addrs=current_actor().registry_addrs,
)
parent_addr: UnwrappedAddress = self._actor.accept_addr
assert parent_addr
@ -454,7 +456,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# the "hard join phase".
log.runtime(
'Waiting on subactors to complete:\n'
f'{pformat(an._children)}\n'
f'>}} {len(an._children)}\n'
)
an._join_procs.set()
@ -468,7 +470,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
await maybe_wait_for_debugger(
await debug.maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug
)
@ -544,7 +546,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug.
await maybe_wait_for_debugger(
await debug.maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug
)
@ -593,6 +595,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
# final exit
_shutdown_msg: str = (
'Actor-runtime-shutdown'
)
# @api_frame
@acm
async def open_nursery(
@ -681,17 +687,26 @@ async def open_nursery(
):
__tracebackhide__: bool = False
msg: str = (
'Actor-nursery exited\n'
f'|_{an}\n'
op_nested_an_repr: str = _pformat.nest_from_op(
input_op=')>',
text=f'{an}',
# nest_prefix='|_',
nest_indent=1, # under >
)
an_msg: str = (
f'Actor-nursery exited\n'
f'{op_nested_an_repr}\n'
)
# keep noise low during std operation.
log.runtime(an_msg)
if implicit_runtime:
# shutdown runtime if it was started and report noisly
# that we're did so.
msg += '=> Shutting down actor runtime <=\n'
msg: str = (
'\n'
'\n'
f'{_shutdown_msg} )>\n'
)
log.info(msg)
else:
# keep noise low during std operation.
log.runtime(msg)

View File

@ -31,7 +31,7 @@ import trio
def maybe_collapse_eg(
beg: BaseExceptionGroup,
) -> BaseException:
) -> BaseException|bool:
'''
If the input beg can collapse to a single non-eg sub-exception,
return it instead.
@ -40,13 +40,12 @@ def maybe_collapse_eg(
if len(excs := beg.exceptions) == 1:
return excs[0]
return beg
return False
@acm
async def collapse_eg(
hide_tb: bool = True,
raise_from_src: bool = False,
):
'''
If `BaseExceptionGroup` raised in the body scope is
@ -61,9 +60,11 @@ async def collapse_eg(
except* BaseException as beg:
if (
exc := maybe_collapse_eg(beg)
) is not beg:
from_exc = beg if raise_from_src else None
raise exc from from_exc
):
if cause := exc.__cause__:
raise exc from cause
raise exc
raise beg