diff --git a/tractor/_root.py b/tractor/_root.py index e9cac3f2..711bd442 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -47,10 +47,11 @@ from .ipc import ( _connect_chan, ) from ._addr import ( - AddressTypes, - wrap_address, + UnwrappedAddress, + default_lo_addrs, + mk_uuid, preferred_transport, - default_lo_addrs + wrap_address, ) from ._exceptions import is_multi_cancelled @@ -63,10 +64,10 @@ async def open_root_actor( *, # defaults are above - registry_addrs: list[AddressTypes]|None = None, + registry_addrs: list[UnwrappedAddress]|None = None, # defaults are above - arbiter_addr: tuple[AddressTypes]|None = None, + arbiter_addr: tuple[UnwrappedAddress]|None = None, enable_transports: list[str] = [preferred_transport], @@ -195,7 +196,9 @@ async def open_root_actor( registry_addrs = [arbiter_addr] if not registry_addrs: - registry_addrs: list[AddressTypes] = default_lo_addrs(enable_transports) + registry_addrs: list[UnwrappedAddress] = default_lo_addrs( + enable_transports + ) assert registry_addrs @@ -245,10 +248,10 @@ async def open_root_actor( enable_stack_on_sig() # closed into below ping task-func - ponged_addrs: list[AddressTypes] = [] + ponged_addrs: list[UnwrappedAddress] = [] async def ping_tpt_socket( - addr: AddressTypes, + addr: UnwrappedAddress, timeout: float = 1, ) -> None: ''' @@ -284,7 +287,7 @@ async def open_root_actor( addr, ) - trans_bind_addrs: list[AddressTypes] = [] + trans_bind_addrs: list[UnwrappedAddress] = [] # Create a new local root-actor instance which IS NOT THE # REGISTRAR @@ -302,6 +305,7 @@ async def open_root_actor( actor = Actor( name=name or 'anonymous', + uuid=mk_uuid(), registry_addrs=ponged_addrs, loglevel=loglevel, enable_modules=enable_modules, @@ -336,7 +340,8 @@ async def open_root_actor( # https://github.com/goodboy/tractor/issues/296 actor = Arbiter( - name or 'registrar', + name=name or 'registrar', + uuid=mk_uuid(), registry_addrs=registry_addrs, loglevel=loglevel, enable_modules=enable_modules, @@ -462,7 +467,7 @@ def run_daemon( # runtime kwargs name: str | None = 'root', - registry_addrs: list[AddressTypes]|None = None, + registry_addrs: list[UnwrappedAddress]|None = None, start_method: str | None = None, debug_mode: bool = False, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index cb46e953..f0489814 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -52,6 +52,7 @@ import sys from typing import ( Any, Callable, + Type, TYPE_CHECKING, ) import uuid @@ -75,11 +76,12 @@ from tractor.msg import ( ) from .ipc import Channel from ._addr import ( - AddressTypes, + UnwrappedAddress, Address, - wrap_address, + default_lo_addrs, + get_address_cls, preferred_transport, - default_lo_addrs + wrap_address, ) from ._context import ( mk_context, @@ -182,15 +184,15 @@ class Actor: def __init__( self, name: str, + uuid: str, *, enable_modules: list[str] = [], - uid: str|None = None, loglevel: str|None = None, - registry_addrs: list[AddressTypes]|None = None, + registry_addrs: list[UnwrappedAddress]|None = None, spawn_method: str|None = None, # TODO: remove! - arbiter_addr: AddressTypes|None = None, + arbiter_addr: UnwrappedAddress|None = None, ) -> None: ''' @@ -199,10 +201,7 @@ class Actor: ''' self.name = name - self.uid = ( - name, - uid or str(uuid.uuid4()) - ) + self.uid = (name, uuid) self._cancel_complete = trio.Event() self._cancel_called_by_remote: tuple[str, tuple]|None = None @@ -230,7 +229,7 @@ class Actor: DeprecationWarning, stacklevel=2, ) - registry_addrs: list[AddressTypes] = [arbiter_addr] + registry_addrs: list[UnwrappedAddress] = [arbiter_addr] # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -277,13 +276,13 @@ class Actor: # when provided, init the registry addresses property from # input via the validator. - self._reg_addrs: list[AddressTypes] = [] + self._reg_addrs: list[UnwrappedAddress] = [] if registry_addrs: - self.reg_addrs: list[AddressTypes] = registry_addrs + self.reg_addrs: list[UnwrappedAddress] = registry_addrs _state._runtime_vars['_registry_addrs'] = registry_addrs @property - def reg_addrs(self) -> list[AddressTypes]: + def reg_addrs(self) -> list[UnwrappedAddress]: ''' List of (socket) addresses for all known (and contactable) registry actors. @@ -294,7 +293,7 @@ class Actor: @reg_addrs.setter def reg_addrs( self, - addrs: list[AddressTypes], + addrs: list[UnwrappedAddress], ) -> None: if not addrs: log.warning( @@ -1023,11 +1022,12 @@ class Actor: async def _from_parent( self, - parent_addr: AddressTypes|None, + parent_addr: UnwrappedAddress|None, ) -> tuple[ Channel, - list[AddressTypes]|None, + list[UnwrappedAddress]|None, + list[str]|None, # preferred tpts ]: ''' Bootstrap this local actor's runtime config from its parent by @@ -1039,30 +1039,58 @@ class Actor: # Connect back to the parent actor and conduct initial # handshake. From this point on if we error, we # attempt to ship the exception back to the parent. - chan = await Channel.from_addr(wrap_address(parent_addr)) + chan = await Channel.from_addr( + addr=wrap_address(parent_addr) + ) + assert isinstance(chan, Channel) # TODO: move this into a `Channel.handshake()`? # Initial handshake: swap names. await self._do_handshake(chan) - accept_addrs: list[AddressTypes]|None = None + accept_addrs: list[UnwrappedAddress]|None = None if self._spawn_method == "trio": # Receive post-spawn runtime state from our parent. spawnspec: msgtypes.SpawnSpec = await chan.recv() - self._spawn_spec = spawnspec + match spawnspec: + case MsgTypeError(): + raise spawnspec + case msgtypes.SpawnSpec(): + self._spawn_spec = spawnspec + log.runtime( + 'Received runtime spec from parent:\n\n' - log.runtime( - 'Received runtime spec from parent:\n\n' + # TODO: eventually all these msgs as + # `msgspec.Struct` with a special mode that + # pformats them in multi-line mode, BUT only + # if "trace"/"util" mode is enabled? + f'{pretty_struct.pformat(spawnspec)}\n' + ) - # TODO: eventually all these msgs as - # `msgspec.Struct` with a special mode that - # pformats them in multi-line mode, BUT only - # if "trace"/"util" mode is enabled? - f'{pretty_struct.pformat(spawnspec)}\n' - ) - accept_addrs: list[AddressTypes] = spawnspec.bind_addrs + case _: + raise InternalError( + f'Received invalid non-`SpawnSpec` payload !?\n' + f'{spawnspec}\n' + ) + + # ^^TODO XXX!! when the `SpawnSpec` fails to decode + # the above will raise a `MsgTypeError` which if we + # do NOT ALSO RAISE it will tried to be pprinted in + # the log.runtime() below.. + # + # SO we gotta look at how other `chan.recv()` calls + # are wrapped and do the same for this spec receive! + # -[ ] see `._rpc` likely has the answer? + # + # XXX NOTE, can't be called here in subactor + # bc we haven't yet received the + # `SpawnSpec._runtime_vars: dict` which would + # declare whether `debug_mode` is set! + # breakpoint() + # import pdbp; pdbp.set_trace() + accept_addrs: list[UnwrappedAddress] = spawnspec.bind_addrs # TODO: another `Struct` for rtvs.. rvs: dict[str, Any] = spawnspec._runtime_vars @@ -1154,6 +1182,9 @@ class Actor: return ( chan, accept_addrs, + None, + # ^TODO, preferred tpts list from rent! + # -[ ] need to extend the `SpawnSpec` tho! ) except OSError: # failed to connect @@ -1169,7 +1200,7 @@ class Actor: self, handler_nursery: Nursery, *, - listen_addrs: list[AddressTypes]|None = None, + listen_addrs: list[UnwrappedAddress]|None = None, task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -1578,7 +1609,7 @@ class Actor: return False @property - def accept_addrs(self) -> list[AddressTypes]: + def accept_addrs(self) -> list[UnwrappedAddress]: ''' All addresses to which the transport-channel server binds and listens for new connections. @@ -1587,7 +1618,7 @@ class Actor: return [a.unwrap() for a in self._listen_addrs] @property - def accept_addr(self) -> AddressTypes: + def accept_addr(self) -> UnwrappedAddress: ''' Primary address to which the IPC transport server is bound and listening for new connections. @@ -1639,8 +1670,6 @@ class Actor: chan.aid = aid uid: tuple[str, str] = ( - # str(value[0]), - # str(value[1]) aid.name, aid.uuid, ) @@ -1664,7 +1693,7 @@ class Actor: async def async_main( actor: Actor, - accept_addrs: AddressTypes|None = None, + accept_addrs: UnwrappedAddress|None = None, # XXX: currently ``parent_addr`` is only needed for the # ``multiprocessing`` backend (which pickles state sent to @@ -1673,7 +1702,7 @@ async def async_main( # change this to a simple ``is_subactor: bool`` which will # be False when running as root actor and True when as # a subactor. - parent_addr: AddressTypes|None = None, + parent_addr: UnwrappedAddress|None = None, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -1702,16 +1731,31 @@ async def async_main( ( actor._parent_chan, set_accept_addr_says_rent, + maybe_preferred_transports_says_rent, ) = await actor._from_parent(parent_addr) + # either it's passed in because we're not a child or # because we're running in mp mode + accept_addrs: list[UnwrappedAddress] = [] if ( set_accept_addr_says_rent and set_accept_addr_says_rent is not None ): accept_addrs = set_accept_addr_says_rent + else: + enable_transports: list[str] = ( + maybe_preferred_transports_says_rent + or + [preferred_transport] + ) + for transport_key in enable_transports: + transport_cls: Type[Address] = get_address_cls( + transport_key + ) + addr: Address = transport_cls.get_random() + accept_addrs.append(addr.unwrap()) # The "root" nursery ensures the channel with the immediate # parent is kept alive as a resilient service until @@ -1779,7 +1823,7 @@ async def async_main( raise - accept_addrs: list[AddressTypes] = actor.accept_addrs + accept_addrs: list[UnwrappedAddress] = actor.accept_addrs # NOTE: only set the loopback addr for the # process-tree-global "root" mailbox since @@ -2028,7 +2072,7 @@ class Arbiter(Actor): self._registry: dict[ tuple[str, str], - AddressTypes, + UnwrappedAddress, ] = {} self._waiters: dict[ str, @@ -2044,7 +2088,7 @@ class Arbiter(Actor): self, name: str, - ) -> AddressTypes|None: + ) -> UnwrappedAddress|None: for uid, addr in self._registry.items(): if name in uid: @@ -2055,7 +2099,7 @@ class Arbiter(Actor): async def get_registry( self - ) -> dict[str, AddressTypes]: + ) -> dict[str, UnwrappedAddress]: ''' Return current name registry. @@ -2075,7 +2119,7 @@ class Arbiter(Actor): self, name: str, - ) -> list[AddressTypes]: + ) -> list[UnwrappedAddress]: ''' Wait for a particular actor to register. @@ -2083,8 +2127,8 @@ class Arbiter(Actor): registered. ''' - addrs: list[AddressTypes] = [] - addr: AddressTypes + addrs: list[UnwrappedAddress] = [] + addr: UnwrappedAddress mailbox_info: str = 'Actor registry contact infos:\n' for uid, addr in self._registry.items(): @@ -2110,7 +2154,7 @@ class Arbiter(Actor): async def register_actor( self, uid: tuple[str, str], - addr: AddressTypes + addr: UnwrappedAddress ) -> None: uid = name, hash = (str(uid[0]), str(uid[1])) waddr: Address = wrap_address(addr) diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 2a3842f7..e4017c44 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -22,7 +22,9 @@ from contextlib import asynccontextmanager as acm from functools import partial import inspect from pprint import pformat -from typing import TYPE_CHECKING +from typing import ( + TYPE_CHECKING, +) import typing import warnings @@ -31,9 +33,9 @@ import trio from .devx._debug import maybe_wait_for_debugger from ._addr import ( - AddressTypes, + UnwrappedAddress, preferred_transport, - get_address_cls + mk_uuid, ) from ._state import current_actor, is_main_process from .log import get_logger, get_loglevel @@ -134,7 +136,7 @@ class ActorNursery: *, - bind_addrs: list[AddressTypes]|None = None, + bind_addrs: list[UnwrappedAddress]|None = None, rpc_module_paths: list[str]|None = None, enable_transports: list[str] = [preferred_transport], enable_modules: list[str]|None = None, @@ -161,12 +163,6 @@ class ActorNursery: or get_loglevel() ) - if not bind_addrs: - bind_addrs: list[AddressTypes] = [ - get_address_cls(transport).get_random().unwrap() - for transport in enable_transports - ] - # configure and pass runtime state _rtv = _state._runtime_vars.copy() _rtv['_is_root'] = False @@ -189,7 +185,9 @@ class ActorNursery: enable_modules.extend(rpc_module_paths) subactor = Actor( - name, + name=name, + uuid=mk_uuid(), + # modules allowed to invoked funcs from enable_modules=enable_modules, loglevel=loglevel, @@ -197,7 +195,7 @@ class ActorNursery: # verbatim relay this actor's registrar addresses registry_addrs=current_actor().reg_addrs, ) - parent_addr = self._actor.accept_addr + parent_addr: UnwrappedAddress = self._actor.accept_addr assert parent_addr # start a task to spawn a process @@ -235,7 +233,7 @@ class ActorNursery: *, name: str | None = None, - bind_addrs: AddressTypes|None = None, + bind_addrs: UnwrappedAddress|None = None, rpc_module_paths: list[str] | None = None, enable_modules: list[str] | None = None, loglevel: str | None = None, # set log level per subactor