Use `Address` where possible in (root) actor boot

Namely inside various bootup-sequences in `._root` and `._runtime`
particularly in the root actor to support both better tpt-address
denoting in our logging and as part of clarifying logic around setting
the root's registry addresses which is soon to be much better factored
out of the core and into an explicit subsystem + API.

Some `_root.open_root_actor()` deats,
- set `registry_addrs` to a new `uw_reg_addrs` (uw: unwrapped) to be
  more explicit about wrapped addr types thoughout.
- instead ensure `registry_addrs` are the wrapped types and pass down
  into the root `Actor` singleton-instance.
- factor the root-actor check + rt-vars update (updating the `'_root_addrs'`)
  out of `._runtime.async_main()` into this fn.
- as previous, set `trans_bind_addrs = uw_reg_addrs` in unwrapped form since it will
  be passed down both through rt-vars as `'_root_addrs'` and to
  `._runtim.async_main()` as `accept_addrs` (which is then passed to the
  IPC server).
- adjust/simplify much logging.
- shield the `await actor.cancel(None)  # self cancel` to avoid any
  finally-footguns.
- as mentioned convert the

For `_runtime.async_main()` tweaks,
- expect `registry_addrs: list[Address]|None = None` with appropriate
  unwrapping prior to setting both `.reg_addrs` and the equiv rt-var.
- add a new `.registry_addrs` prop for the wrapped form.
- convert a final loose-eg for the `service_nursery` to use
  `collapse_eg()`.
- simplify teardown report logging.
free_threading_prep
Tyler Goodlet 2025-07-07 10:37:02 -04:00
parent f86f4ae48d
commit ae91310b32
2 changed files with 118 additions and 69 deletions

View File

@ -202,7 +202,9 @@ async def open_root_actor(
'''
# XXX NEVER allow nested actor-trees!
if already_actor := _state.current_actor(err_on_no_runtime=False):
if already_actor := _state.current_actor(
err_on_no_runtime=False,
):
rtvs: dict[str, Any] = _state._runtime_vars
root_mailbox: list[str, int] = rtvs['_root_mailbox']
registry_addrs: list[list[str, int]] = rtvs['_registry_addrs']
@ -272,14 +274,20 @@ async def open_root_actor(
DeprecationWarning,
stacklevel=2,
)
registry_addrs = [arbiter_addr]
uw_reg_addrs = [arbiter_addr]
if not registry_addrs:
registry_addrs: list[UnwrappedAddress] = default_lo_addrs(
uw_reg_addrs = registry_addrs
if not uw_reg_addrs:
uw_reg_addrs: list[UnwrappedAddress] = default_lo_addrs(
enable_transports
)
assert registry_addrs
# must exist by now since all below code is dependent
assert uw_reg_addrs
registry_addrs: list[Address] = [
wrap_address(uw_addr)
for uw_addr in uw_reg_addrs
]
loglevel = (
loglevel
@ -328,10 +336,10 @@ async def open_root_actor(
enable_stack_on_sig()
# closed into below ping task-func
ponged_addrs: list[UnwrappedAddress] = []
ponged_addrs: list[Address] = []
async def ping_tpt_socket(
addr: UnwrappedAddress,
addr: Address,
timeout: float = 1,
) -> None:
'''
@ -351,17 +359,22 @@ async def open_root_actor(
# be better to eventually have a "discovery" protocol
# with basic handshake instead?
with trio.move_on_after(timeout):
async with _connect_chan(addr):
async with _connect_chan(addr.unwrap()):
ponged_addrs.append(addr)
except OSError:
# TODO: make this a "discovery" log level?
# ?TODO, make this a "discovery" log level?
logger.info(
f'No actor registry found @ {addr}\n'
f'No root-actor registry found @ {addr!r}\n'
)
# !TODO, this is basically just another (abstract)
# happy-eyeballs, so we should try for formalize it somewhere
# in a `.[_]discovery` ya?
#
async with trio.open_nursery() as tn:
for addr in registry_addrs:
for uw_addr in uw_reg_addrs:
addr: Address = wrap_address(uw_addr)
tn.start_soon(
ping_tpt_socket,
addr,
@ -390,24 +403,28 @@ async def open_root_actor(
loglevel=loglevel,
enable_modules=enable_modules,
)
# DO NOT use the registry_addrs as the transport server
# addrs for this new non-registar, root-actor.
# **DO NOT** use the registry_addrs as the
# ipc-transport-server's bind-addrs as this is
# a new NON-registrar, ROOT-actor.
#
# XXX INSTEAD, bind random addrs using the same tpt
# proto.
for addr in ponged_addrs:
waddr: Address = wrap_address(addr)
trans_bind_addrs.append(
waddr.get_random(bindspace=waddr.bindspace)
addr.get_random(
bindspace=addr.bindspace,
)
)
# Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors.
else:
# NOTE that if the current actor IS THE REGISTAR, the
# following init steps are taken:
# - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default.
trans_bind_addrs = registry_addrs
trans_bind_addrs = uw_reg_addrs
# - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub)
@ -431,6 +448,16 @@ async def open_root_actor(
# `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# NOTE, only set the loopback addr for the
# process-tree-global "root" mailbox since all sub-actors
# should be able to speak to their root actor over that
# channel.
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
raddrs.extend(trans_bind_addrs)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Start up main task set via core actor-runtime nurseries.
try:
# assign process-local actor
@ -438,20 +465,26 @@ async def open_root_actor(
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
ml_addrs_str: str = '\n'.join(
f'@{addr}' for addr in trans_bind_addrs
report: str = f'Starting actor-runtime for {actor.aid.reprol()!r}\n'
if reg_addrs := actor.registry_addrs:
report += (
'-> Opening new registry @ '
+
'\n'.join(
f'@{addr}' for addr in reg_addrs
)
logger.info(
f'Starting local {actor.uid} on the following transport addrs:\n'
f'{ml_addrs_str}'
)
logger.info(f'{report}\n')
# start runtime in a bg sub-task, yield to caller.
async with (
collapse_eg(),
trio.open_nursery() as root_tn,
):
# XXX, finally-footgun below?
# -> see note on why shielding.
# maybe_raise_from_masking_exc(),
):
# `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called
@ -526,8 +559,13 @@ async def open_root_actor(
)
logger.info(
f'Closing down root actor\n'
f'{op_nested_actor_repr}\n'
f'{op_nested_actor_repr}'
)
# XXX, THIS IS A *finally-footgun*!
# -> though already shields iternally it can
# taskc here and mask underlying errors raised in
# the try-block above?
with trio.CancelScope(shield=True):
await actor.cancel(None) # self cancel
finally:
# revert all process-global runtime state
@ -541,10 +579,16 @@ async def open_root_actor(
_state._current_actor = None
_state._last_actor_terminated = actor
logger.runtime(
sclang_repr: str = _pformat.nest_from_op(
input_op=')>',
text=actor.pformat(),
nest_prefix='|_',
nest_indent=1,
)
logger.info(
f'Root actor terminated\n'
f')>\n'
f' |_{actor}\n'
f'{sclang_repr}'
)

View File

@ -213,7 +213,7 @@ class Actor:
*,
enable_modules: list[str] = [],
loglevel: str|None = None,
registry_addrs: list[UnwrappedAddress]|None = None,
registry_addrs: list[Address]|None = None,
spawn_method: str|None = None,
# TODO: remove!
@ -256,11 +256,12 @@ class Actor:
if arbiter_addr is not None:
warnings.warn(
'`Actor(arbiter_addr=<blah>)` is now deprecated.\n'
'Use `registry_addrs: list[tuple]` instead.',
'Use `registry_addrs: list[Address]` instead.',
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[UnwrappedAddress] = [arbiter_addr]
registry_addrs: list[Address] = [wrap_address(arbiter_addr)]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
@ -299,8 +300,10 @@ class Actor:
# input via the validator.
self._reg_addrs: list[UnwrappedAddress] = []
if registry_addrs:
self.reg_addrs: list[UnwrappedAddress] = registry_addrs
_state._runtime_vars['_registry_addrs'] = registry_addrs
_state._runtime_vars['_registry_addrs'] = self.reg_addrs = [
addr.unwrap()
for addr in registry_addrs
]
@property
def aid(self) -> msgtypes.Aid:
@ -472,7 +475,11 @@ class Actor:
def reg_addrs(self) -> list[UnwrappedAddress]:
'''
List of (socket) addresses for all known (and contactable)
registry actors.
registry-service actors in "unwrapped" (i.e. IPC interchange
wire-compat) form.
If you are looking for the "wrapped" address form, use
`.registry_addrs` instead.
'''
return self._reg_addrs
@ -491,8 +498,14 @@ class Actor:
self._reg_addrs = addrs
@property
def registry_addrs(self) -> list[Address]:
return [wrap_address(uw_addr)
for uw_addr in self.reg_addrs]
def load_modules(
self,
) -> None:
'''
Load explicitly enabled python modules from local fs after
@ -1365,7 +1378,7 @@ class Actor:
Return all IPC channels to the actor with provided `uid`.
'''
return self._peers[uid]
return self._ipc_server._peers[uid]
def is_infected_aio(self) -> bool:
'''
@ -1420,6 +1433,8 @@ async def async_main(
# establish primary connection with immediate parent
actor._parent_chan: Channel|None = None
# is this a sub-actor?
# get runtime info from parent.
if parent_addr is not None:
(
actor._parent_chan,
@ -1464,10 +1479,8 @@ async def async_main(
ipc_server: _server.IPCServer
async with (
trio.open_nursery(
strict_exception_groups=False,
) as service_nursery,
collapse_eg(),
trio.open_nursery() as service_nursery,
_server.open_ipc_server(
parent_tn=service_nursery,
stream_handler_tn=service_nursery,
@ -1518,9 +1531,6 @@ async def async_main(
# TODO: why is this not with the root nursery?
try:
log.runtime(
'Booting IPC server'
)
eps: list = await ipc_server.listen_on(
accept_addrs=accept_addrs,
stream_handler_nursery=service_nursery,
@ -1552,18 +1562,6 @@ async def async_main(
# TODO, just read direct from ipc_server?
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
# NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since
# all sub-actors should be able to speak to
# their root actor over that channel.
if _state._runtime_vars['_is_root']:
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
for addr in accept_addrs:
waddr: Address = wrap_address(addr)
raddrs.append(addr)
else:
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Register with the arbiter if we're told its addr
log.runtime(
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
@ -1581,6 +1579,7 @@ async def async_main(
except AssertionError:
await debug.pause()
# !TODO, get rid of the local-portal crap XD
async with get_registry(addr) as reg_portal:
for accept_addr in accept_addrs:
accept_addr = wrap_address(accept_addr)
@ -1619,7 +1618,7 @@ async def async_main(
log.runtime(
'Service nursery complete\n'
'\n'
'-> Waiting on root nursery to complete'
'->} waiting on root nursery to complete..\n'
)
# Blocks here as expected until the root nursery is
@ -1674,6 +1673,7 @@ async def async_main(
finally:
teardown_report: str = (
'Main actor-runtime task completed\n'
'\n'
)
# ?TODO? should this be in `._entry`/`._root` mods instead?
@ -1715,7 +1715,8 @@ async def async_main(
# Unregister actor from the registry-sys / registrar.
if (
is_registered
and not actor.is_registrar
and
not actor.is_registrar
):
failed: bool = False
for addr in actor.reg_addrs:
@ -1750,7 +1751,8 @@ async def async_main(
ipc_server.has_peers(check_chans=True)
):
teardown_report += (
f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n'
f'-> Waiting for remaining peers to clear..\n'
f' {pformat(ipc_server._peers)}'
)
log.runtime(teardown_report)
await ipc_server.wait_for_no_more_peers(
@ -1758,20 +1760,23 @@ async def async_main(
)
teardown_report += (
'-> All peer channels are complete\n'
'-]> all peer channels are complete.\n'
)
op_nested_actor_repr: str = _pformat.nest_from_op(
input_op=')>',
text=actor.pformat(),
nest_prefix='|_',
nest_indent=1, # under >
)
# op_nested_actor_repr: str = _pformat.nest_from_op(
# input_op=')>',
# text=actor.pformat(),
# nest_prefix='|_',
# nest_indent=1, # under >
# )
teardown_report += (
'Actor runtime exited\n'
f'{op_nested_actor_repr}'
'-)> actor runtime main task exit.\n'
# f'{op_nested_actor_repr}'
)
log.info(teardown_report)
# if _state._runtime_vars['_is_root']:
# log.info(teardown_report)
# else:
log.runtime(teardown_report)
# TODO: rename to `Registry` and move to `.discovery._registry`!