Allocate bind-addrs in subactors

Previously whenever an `ActorNursery.start_actor()` call did not receive
a `bind_addrs` arg we would allocate the default `(localhost, 0)` pairs
in the parent, for UDS this obviously won't work nor is it ideal bc it's
nicer to have the actor to be a socket server (who calls
`Address.open_listener()`) define the socket-file-name containing their
unique ID info such as pid, actor-uuid etc.

As such this moves "random" generation of server addresses to the
child-side of a subactor's spawn-sequence when it's sin-`bind_addrs`;
i.e. we do the allocation of the `Address.get_random()` addrs inside
`._runtime.async_main()` instead of `Portal.start_actor()` and **only
when** `accept_addrs`/`bind_addrs` was **not provided by the spawning
parent**.

Further this patch get's way more rigorous about the `SpawnSpec`
processing in the child inside `Actor._from_parent()` such that we
handle any invalid msgs **very loudly and pedantically!**

Impl deats,
- do the "random addr generation" in an explicit `for` loop (instead of
  prior comprehension) to allow for more detailed typing of the layered
  calls to the new `._addr` mod.
- use a `match:/case:` for process any invalid `SpawnSpec` payload case
  where we can instead receive a `MsgTypeError` from the `chan.recv()`
  call in `Actor._from_parent()` to raise it immediately instead of
  triggering downstream type-errors XD
  |_ as per the big `#TODO` we prolly want to take from other callers
     of `Channel.recv()` (like in the `._rpc.process_messages()` loop).
  |_ always raise `InternalError` on non-match/fall-through case!
  |_ add a note about not being able to use `breakpoint()` in this
     section due to causality of `SpawnSpec._runtime_vars` not having
     been processed yet..
  |_ always return a third element from `._from_rent()` eventually to be
     the `preferred_transports: list[str]` from the spawning rent.
- use new `._addr.mk_uuid()` and pass to new `Actor.__init__(uuid: str)`
  for all actor creation (including in all the mods tweaked here).
- Move to new type-alias-name `UnwrappedAddress` throughout.
structural_dynamics_of_flow
Tyler Goodlet 2025-03-30 21:36:45 -04:00
parent cb6c10bbe9
commit ddeab1355a
3 changed files with 115 additions and 68 deletions

View File

@ -47,10 +47,11 @@ from .ipc import (
_connect_chan, _connect_chan,
) )
from ._addr import ( from ._addr import (
AddressTypes, UnwrappedAddress,
wrap_address, default_lo_addrs,
mk_uuid,
preferred_transport, preferred_transport,
default_lo_addrs wrap_address,
) )
from ._exceptions import is_multi_cancelled from ._exceptions import is_multi_cancelled
@ -63,10 +64,10 @@ async def open_root_actor(
*, *,
# defaults are above # defaults are above
registry_addrs: list[AddressTypes]|None = None, registry_addrs: list[UnwrappedAddress]|None = None,
# defaults are above # defaults are above
arbiter_addr: tuple[AddressTypes]|None = None, arbiter_addr: tuple[UnwrappedAddress]|None = None,
enable_transports: list[str] = [preferred_transport], enable_transports: list[str] = [preferred_transport],
@ -195,7 +196,9 @@ async def open_root_actor(
registry_addrs = [arbiter_addr] registry_addrs = [arbiter_addr]
if not registry_addrs: if not registry_addrs:
registry_addrs: list[AddressTypes] = default_lo_addrs(enable_transports) registry_addrs: list[UnwrappedAddress] = default_lo_addrs(
enable_transports
)
assert registry_addrs assert registry_addrs
@ -245,10 +248,10 @@ async def open_root_actor(
enable_stack_on_sig() enable_stack_on_sig()
# closed into below ping task-func # closed into below ping task-func
ponged_addrs: list[AddressTypes] = [] ponged_addrs: list[UnwrappedAddress] = []
async def ping_tpt_socket( async def ping_tpt_socket(
addr: AddressTypes, addr: UnwrappedAddress,
timeout: float = 1, timeout: float = 1,
) -> None: ) -> None:
''' '''
@ -284,7 +287,7 @@ async def open_root_actor(
addr, addr,
) )
trans_bind_addrs: list[AddressTypes] = [] trans_bind_addrs: list[UnwrappedAddress] = []
# Create a new local root-actor instance which IS NOT THE # Create a new local root-actor instance which IS NOT THE
# REGISTRAR # REGISTRAR
@ -302,6 +305,7 @@ async def open_root_actor(
actor = Actor( actor = Actor(
name=name or 'anonymous', name=name or 'anonymous',
uuid=mk_uuid(),
registry_addrs=ponged_addrs, registry_addrs=ponged_addrs,
loglevel=loglevel, loglevel=loglevel,
enable_modules=enable_modules, enable_modules=enable_modules,
@ -336,7 +340,8 @@ async def open_root_actor(
# https://github.com/goodboy/tractor/issues/296 # https://github.com/goodboy/tractor/issues/296
actor = Arbiter( actor = Arbiter(
name or 'registrar', name=name or 'registrar',
uuid=mk_uuid(),
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
loglevel=loglevel, loglevel=loglevel,
enable_modules=enable_modules, enable_modules=enable_modules,
@ -462,7 +467,7 @@ def run_daemon(
# runtime kwargs # runtime kwargs
name: str | None = 'root', name: str | None = 'root',
registry_addrs: list[AddressTypes]|None = None, registry_addrs: list[UnwrappedAddress]|None = None,
start_method: str | None = None, start_method: str | None = None,
debug_mode: bool = False, debug_mode: bool = False,

View File

@ -52,6 +52,7 @@ import sys
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Type,
TYPE_CHECKING, TYPE_CHECKING,
) )
import uuid import uuid
@ -75,11 +76,12 @@ from tractor.msg import (
) )
from .ipc import Channel from .ipc import Channel
from ._addr import ( from ._addr import (
AddressTypes, UnwrappedAddress,
Address, Address,
wrap_address, default_lo_addrs,
get_address_cls,
preferred_transport, preferred_transport,
default_lo_addrs wrap_address,
) )
from ._context import ( from ._context import (
mk_context, mk_context,
@ -182,15 +184,15 @@ class Actor:
def __init__( def __init__(
self, self,
name: str, name: str,
uuid: str,
*, *,
enable_modules: list[str] = [], enable_modules: list[str] = [],
uid: str|None = None,
loglevel: str|None = None, loglevel: str|None = None,
registry_addrs: list[AddressTypes]|None = None, registry_addrs: list[UnwrappedAddress]|None = None,
spawn_method: str|None = None, spawn_method: str|None = None,
# TODO: remove! # TODO: remove!
arbiter_addr: AddressTypes|None = None, arbiter_addr: UnwrappedAddress|None = None,
) -> None: ) -> None:
''' '''
@ -199,10 +201,7 @@ class Actor:
''' '''
self.name = name self.name = name
self.uid = ( self.uid = (name, uuid)
name,
uid or str(uuid.uuid4())
)
self._cancel_complete = trio.Event() self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple]|None = None self._cancel_called_by_remote: tuple[str, tuple]|None = None
@ -230,7 +229,7 @@ class Actor:
DeprecationWarning, DeprecationWarning,
stacklevel=2, stacklevel=2,
) )
registry_addrs: list[AddressTypes] = [arbiter_addr] registry_addrs: list[UnwrappedAddress] = [arbiter_addr]
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started manually
@ -277,13 +276,13 @@ class Actor:
# when provided, init the registry addresses property from # when provided, init the registry addresses property from
# input via the validator. # input via the validator.
self._reg_addrs: list[AddressTypes] = [] self._reg_addrs: list[UnwrappedAddress] = []
if registry_addrs: if registry_addrs:
self.reg_addrs: list[AddressTypes] = registry_addrs self.reg_addrs: list[UnwrappedAddress] = registry_addrs
_state._runtime_vars['_registry_addrs'] = registry_addrs _state._runtime_vars['_registry_addrs'] = registry_addrs
@property @property
def reg_addrs(self) -> list[AddressTypes]: def reg_addrs(self) -> list[UnwrappedAddress]:
''' '''
List of (socket) addresses for all known (and contactable) List of (socket) addresses for all known (and contactable)
registry actors. registry actors.
@ -294,7 +293,7 @@ class Actor:
@reg_addrs.setter @reg_addrs.setter
def reg_addrs( def reg_addrs(
self, self,
addrs: list[AddressTypes], addrs: list[UnwrappedAddress],
) -> None: ) -> None:
if not addrs: if not addrs:
log.warning( log.warning(
@ -1023,11 +1022,12 @@ class Actor:
async def _from_parent( async def _from_parent(
self, self,
parent_addr: AddressTypes|None, parent_addr: UnwrappedAddress|None,
) -> tuple[ ) -> tuple[
Channel, Channel,
list[AddressTypes]|None, list[UnwrappedAddress]|None,
list[str]|None, # preferred tpts
]: ]:
''' '''
Bootstrap this local actor's runtime config from its parent by Bootstrap this local actor's runtime config from its parent by
@ -1039,30 +1039,58 @@ class Actor:
# Connect back to the parent actor and conduct initial # Connect back to the parent actor and conduct initial
# handshake. From this point on if we error, we # handshake. From this point on if we error, we
# attempt to ship the exception back to the parent. # attempt to ship the exception back to the parent.
chan = await Channel.from_addr(wrap_address(parent_addr)) chan = await Channel.from_addr(
addr=wrap_address(parent_addr)
)
assert isinstance(chan, Channel)
# TODO: move this into a `Channel.handshake()`? # TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names. # Initial handshake: swap names.
await self._do_handshake(chan) await self._do_handshake(chan)
accept_addrs: list[AddressTypes]|None = None accept_addrs: list[UnwrappedAddress]|None = None
if self._spawn_method == "trio": if self._spawn_method == "trio":
# Receive post-spawn runtime state from our parent. # Receive post-spawn runtime state from our parent.
spawnspec: msgtypes.SpawnSpec = await chan.recv() spawnspec: msgtypes.SpawnSpec = await chan.recv()
self._spawn_spec = spawnspec match spawnspec:
case MsgTypeError():
raise spawnspec
case msgtypes.SpawnSpec():
self._spawn_spec = spawnspec
log.runtime(
'Received runtime spec from parent:\n\n'
log.runtime( # TODO: eventually all these msgs as
'Received runtime spec from parent:\n\n' # `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
f'{pretty_struct.pformat(spawnspec)}\n'
)
# TODO: eventually all these msgs as case _:
# `msgspec.Struct` with a special mode that raise InternalError(
# pformats them in multi-line mode, BUT only f'Received invalid non-`SpawnSpec` payload !?\n'
# if "trace"/"util" mode is enabled? f'{spawnspec}\n'
f'{pretty_struct.pformat(spawnspec)}\n' )
)
accept_addrs: list[AddressTypes] = spawnspec.bind_addrs # ^^TODO XXX!! when the `SpawnSpec` fails to decode
# the above will raise a `MsgTypeError` which if we
# do NOT ALSO RAISE it will tried to be pprinted in
# the log.runtime() below..
#
# SO we gotta look at how other `chan.recv()` calls
# are wrapped and do the same for this spec receive!
# -[ ] see `._rpc` likely has the answer?
#
# XXX NOTE, can't be called here in subactor
# bc we haven't yet received the
# `SpawnSpec._runtime_vars: dict` which would
# declare whether `debug_mode` is set!
# breakpoint()
# import pdbp; pdbp.set_trace()
accept_addrs: list[UnwrappedAddress] = spawnspec.bind_addrs
# TODO: another `Struct` for rtvs.. # TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars rvs: dict[str, Any] = spawnspec._runtime_vars
@ -1154,6 +1182,9 @@ class Actor:
return ( return (
chan, chan,
accept_addrs, accept_addrs,
None,
# ^TODO, preferred tpts list from rent!
# -[ ] need to extend the `SpawnSpec` tho!
) )
except OSError: # failed to connect except OSError: # failed to connect
@ -1169,7 +1200,7 @@ class Actor:
self, self,
handler_nursery: Nursery, handler_nursery: Nursery,
*, *,
listen_addrs: list[AddressTypes]|None = None, listen_addrs: list[UnwrappedAddress]|None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -1578,7 +1609,7 @@ class Actor:
return False return False
@property @property
def accept_addrs(self) -> list[AddressTypes]: def accept_addrs(self) -> list[UnwrappedAddress]:
''' '''
All addresses to which the transport-channel server binds All addresses to which the transport-channel server binds
and listens for new connections. and listens for new connections.
@ -1587,7 +1618,7 @@ class Actor:
return [a.unwrap() for a in self._listen_addrs] return [a.unwrap() for a in self._listen_addrs]
@property @property
def accept_addr(self) -> AddressTypes: def accept_addr(self) -> UnwrappedAddress:
''' '''
Primary address to which the IPC transport server is Primary address to which the IPC transport server is
bound and listening for new connections. bound and listening for new connections.
@ -1639,8 +1670,6 @@ class Actor:
chan.aid = aid chan.aid = aid
uid: tuple[str, str] = ( uid: tuple[str, str] = (
# str(value[0]),
# str(value[1])
aid.name, aid.name,
aid.uuid, aid.uuid,
) )
@ -1664,7 +1693,7 @@ class Actor:
async def async_main( async def async_main(
actor: Actor, actor: Actor,
accept_addrs: AddressTypes|None = None, accept_addrs: UnwrappedAddress|None = None,
# XXX: currently ``parent_addr`` is only needed for the # XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to # ``multiprocessing`` backend (which pickles state sent to
@ -1673,7 +1702,7 @@ async def async_main(
# change this to a simple ``is_subactor: bool`` which will # change this to a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as # be False when running as root actor and True when as
# a subactor. # a subactor.
parent_addr: AddressTypes|None = None, parent_addr: UnwrappedAddress|None = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -1702,16 +1731,31 @@ async def async_main(
( (
actor._parent_chan, actor._parent_chan,
set_accept_addr_says_rent, set_accept_addr_says_rent,
maybe_preferred_transports_says_rent,
) = await actor._from_parent(parent_addr) ) = await actor._from_parent(parent_addr)
# either it's passed in because we're not a child or # either it's passed in because we're not a child or
# because we're running in mp mode # because we're running in mp mode
accept_addrs: list[UnwrappedAddress] = []
if ( if (
set_accept_addr_says_rent set_accept_addr_says_rent
and and
set_accept_addr_says_rent is not None set_accept_addr_says_rent is not None
): ):
accept_addrs = set_accept_addr_says_rent accept_addrs = set_accept_addr_says_rent
else:
enable_transports: list[str] = (
maybe_preferred_transports_says_rent
or
[preferred_transport]
)
for transport_key in enable_transports:
transport_cls: Type[Address] = get_address_cls(
transport_key
)
addr: Address = transport_cls.get_random()
accept_addrs.append(addr.unwrap())
# The "root" nursery ensures the channel with the immediate # The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until # parent is kept alive as a resilient service until
@ -1779,7 +1823,7 @@ async def async_main(
raise raise
accept_addrs: list[AddressTypes] = actor.accept_addrs accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
# NOTE: only set the loopback addr for the # NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since # process-tree-global "root" mailbox since
@ -2028,7 +2072,7 @@ class Arbiter(Actor):
self._registry: dict[ self._registry: dict[
tuple[str, str], tuple[str, str],
AddressTypes, UnwrappedAddress,
] = {} ] = {}
self._waiters: dict[ self._waiters: dict[
str, str,
@ -2044,7 +2088,7 @@ class Arbiter(Actor):
self, self,
name: str, name: str,
) -> AddressTypes|None: ) -> UnwrappedAddress|None:
for uid, addr in self._registry.items(): for uid, addr in self._registry.items():
if name in uid: if name in uid:
@ -2055,7 +2099,7 @@ class Arbiter(Actor):
async def get_registry( async def get_registry(
self self
) -> dict[str, AddressTypes]: ) -> dict[str, UnwrappedAddress]:
''' '''
Return current name registry. Return current name registry.
@ -2075,7 +2119,7 @@ class Arbiter(Actor):
self, self,
name: str, name: str,
) -> list[AddressTypes]: ) -> list[UnwrappedAddress]:
''' '''
Wait for a particular actor to register. Wait for a particular actor to register.
@ -2083,8 +2127,8 @@ class Arbiter(Actor):
registered. registered.
''' '''
addrs: list[AddressTypes] = [] addrs: list[UnwrappedAddress] = []
addr: AddressTypes addr: UnwrappedAddress
mailbox_info: str = 'Actor registry contact infos:\n' mailbox_info: str = 'Actor registry contact infos:\n'
for uid, addr in self._registry.items(): for uid, addr in self._registry.items():
@ -2110,7 +2154,7 @@ class Arbiter(Actor):
async def register_actor( async def register_actor(
self, self,
uid: tuple[str, str], uid: tuple[str, str],
addr: AddressTypes addr: UnwrappedAddress
) -> None: ) -> None:
uid = name, hash = (str(uid[0]), str(uid[1])) uid = name, hash = (str(uid[0]), str(uid[1]))
waddr: Address = wrap_address(addr) waddr: Address = wrap_address(addr)

View File

@ -22,7 +22,9 @@ from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import inspect import inspect
from pprint import pformat from pprint import pformat
from typing import TYPE_CHECKING from typing import (
TYPE_CHECKING,
)
import typing import typing
import warnings import warnings
@ -31,9 +33,9 @@ import trio
from .devx._debug import maybe_wait_for_debugger from .devx._debug import maybe_wait_for_debugger
from ._addr import ( from ._addr import (
AddressTypes, UnwrappedAddress,
preferred_transport, preferred_transport,
get_address_cls mk_uuid,
) )
from ._state import current_actor, is_main_process from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
@ -134,7 +136,7 @@ class ActorNursery:
*, *,
bind_addrs: list[AddressTypes]|None = None, bind_addrs: list[UnwrappedAddress]|None = None,
rpc_module_paths: list[str]|None = None, rpc_module_paths: list[str]|None = None,
enable_transports: list[str] = [preferred_transport], enable_transports: list[str] = [preferred_transport],
enable_modules: list[str]|None = None, enable_modules: list[str]|None = None,
@ -161,12 +163,6 @@ class ActorNursery:
or get_loglevel() or get_loglevel()
) )
if not bind_addrs:
bind_addrs: list[AddressTypes] = [
get_address_cls(transport).get_random().unwrap()
for transport in enable_transports
]
# configure and pass runtime state # configure and pass runtime state
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False _rtv['_is_root'] = False
@ -189,7 +185,9 @@ class ActorNursery:
enable_modules.extend(rpc_module_paths) enable_modules.extend(rpc_module_paths)
subactor = Actor( subactor = Actor(
name, name=name,
uuid=mk_uuid(),
# modules allowed to invoked funcs from # modules allowed to invoked funcs from
enable_modules=enable_modules, enable_modules=enable_modules,
loglevel=loglevel, loglevel=loglevel,
@ -197,7 +195,7 @@ class ActorNursery:
# verbatim relay this actor's registrar addresses # verbatim relay this actor's registrar addresses
registry_addrs=current_actor().reg_addrs, registry_addrs=current_actor().reg_addrs,
) )
parent_addr = self._actor.accept_addr parent_addr: UnwrappedAddress = self._actor.accept_addr
assert parent_addr assert parent_addr
# start a task to spawn a process # start a task to spawn a process
@ -235,7 +233,7 @@ class ActorNursery:
*, *,
name: str | None = None, name: str | None = None,
bind_addrs: AddressTypes|None = None, bind_addrs: UnwrappedAddress|None = None,
rpc_module_paths: list[str] | None = None, rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None, enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor loglevel: str | None = None, # set log level per subactor