From 7ce4bc489e9fb3684f7ba1a9d2cda2e4257b5139 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 27 Sep 2023 15:19:30 -0400 Subject: [PATCH] Init-support for "multi homed" transports Since we'd like to eventually allow a diverse set of transport (protocol) methods and stacks, and a multi-peer discovery system for distributed actor-tree applications, this reworks all runtime internals to support multi-homing for any given tree on a logical host. In other words any actor can now bind its transport server (currently only unsecured TCP + `msgspec`) to more then one address available in its (linux) network namespace. Further, registry actors (now dubbed "registars" instead of "arbiters") can also similarly bind to multiple network addresses and provide discovery services to remote actors via multiple addresses which can now be provided at runtime startup. Deats: - adjust `._runtime` internals to use a `list[tuple[str, int]]` (and thus pluralized) socket address sequence where applicable for transport server socket binds, now exposed via `Actor.accept_addrs`: - `Actor.__init__()` now takes a `registry_addrs: list`. - `Actor.is_arbiter` -> `.is_registrar`. - `._arb_addr` -> `._reg_addrs: list[tuple]`. - always reg and de-reg from all registrars in `async_main()`. - only set the global runtime var `'_root_mailbox'` to the loopback address since normally all in-tree processes should have access to it, right? - `._serve_forever()` task now takes `listen_sockaddrs: list[tuple]` - make `open_root_actor()` take a `registry_addrs: list[tuple[str, int]]` and defaults when not passed. - change `ActorNursery.start_..()` methods take `bind_addrs: list` and pass down through the spawning layer(s) via the parent-seed-msg. - generalize all `._discovery()` APIs to accept `registry_addrs`-like inputs and move all relevant subsystems to adopt the "registry" style naming instead of "arbiter": - make `find_actor()` support batched concurrent portal queries over all provided input addresses using `.trionics.gather_contexts()` Bo - syntax: move to using `async with ` 3.9+ style chained @acms. - a general modernization of the code to a python 3.9+ style. - start deprecation and change to "registry" naming / semantics: - `._discovery.get_arbiter()` -> `.get_registry()` --- tractor/_discovery.py | 188 ++++++++++++++++++------- tractor/_entry.py | 10 +- tractor/_ipc.py | 4 +- tractor/_portal.py | 7 +- tractor/_root.py | 139 +++++++++++------- tractor/_runtime.py | 318 ++++++++++++++++++++++++++++-------------- tractor/_spawn.py | 21 ++- tractor/_supervise.py | 23 ++- 8 files changed, 481 insertions(+), 229 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 03775ac2..22ab88d1 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -15,16 +15,19 @@ # along with this program. If not, see . """ -Actor discovery API. +Discovery (protocols) API for automatic addressing and location +management of (service) actors. """ +from __future__ import annotations from typing import ( - Optional, - Union, AsyncGenerator, + TYPE_CHECKING, ) from contextlib import asynccontextmanager as acm +import warnings +from .trionics import gather_contexts from ._ipc import _connect_chan, Channel from ._portal import ( Portal, @@ -34,13 +37,19 @@ from ._portal import ( from ._state import current_actor, _runtime_vars -@acm -async def get_arbiter( +if TYPE_CHECKING: + from ._runtime import Actor + +@acm +async def get_registry( host: str, port: int, -) -> AsyncGenerator[Union[Portal, LocalPortal], None]: +) -> AsyncGenerator[ + Portal | LocalPortal | None, + None, +]: ''' Return a portal instance connected to a local or remote arbiter. @@ -51,16 +60,23 @@ async def get_arbiter( if not actor: raise RuntimeError("No actor instance has been defined yet?") - if actor.is_arbiter: + if actor.is_registrar: # we're already the arbiter # (likely a re-entrant call from the arbiter actor) - yield LocalPortal(actor, Channel((host, port))) + yield LocalPortal( + actor, + Channel((host, port)) + ) else: - async with _connect_chan(host, port) as chan: + async with ( + _connect_chan(host, port) as chan, + open_portal(chan) as regstr_ptl, + ): + yield regstr_ptl - async with open_portal(chan) as arb_portal: - yield arb_portal +# TODO: deprecate and remove _arbiter form +get_arbiter = get_registry @acm @@ -68,51 +84,81 @@ async def get_root( **kwargs, ) -> AsyncGenerator[Portal, None]: + # TODO: rename mailbox to `_root_maddr` when we finally + # add and impl libp2p multi-addrs? host, port = _runtime_vars['_root_mailbox'] assert host is not None - async with _connect_chan(host, port) as chan: - async with open_portal(chan, **kwargs) as portal: - yield portal + async with ( + _connect_chan(host, port) as chan, + open_portal(chan, **kwargs) as portal, + ): + yield portal @acm async def query_actor( name: str, - arbiter_sockaddr: Optional[tuple[str, int]] = None, + arbiter_sockaddr: tuple[str, int] | None = None, + regaddr: tuple[str, int] | None = None, -) -> AsyncGenerator[tuple[str, int], None]: +) -> AsyncGenerator[ + tuple[str, int] | None, + None, +]: ''' - Simple address lookup for a given actor name. + Make a transport address lookup for an actor name to a specific + registrar. - Returns the (socket) address or ``None``. + Returns the (socket) address or ``None`` if no entry under that + name exists for the given registrar listening @ `regaddr`. ''' - actor = current_actor() - async with get_arbiter( - *arbiter_sockaddr or actor._arb_addr - ) as arb_portal: + actor: Actor = current_actor() + if ( + name == 'registrar' + and actor.is_registrar + ): + raise RuntimeError( + 'The current actor IS the registry!?' + ) - sockaddr = await arb_portal.run_from_ns( + if arbiter_sockaddr is not None: + warnings.warn( + '`tractor.query_actor(regaddr=)` is deprecated.\n' + 'Use `registry_addrs: list[tuple]` instead!', + DeprecationWarning, + stacklevel=2, + ) + regaddr: list[tuple[str, int]] = arbiter_sockaddr + + regstr: Portal + async with get_registry( + *(regaddr or actor._reg_addrs[0]) + ) as regstr: + + # TODO: return portals to all available actors - for now + # just the last one that registered + sockaddr: tuple[str, int] = await regstr.run_from_ns( 'self', 'find_actor', name=name, ) - - # TODO: return portals to all available actors - for now just - # the last one that registered - if name == 'arbiter' and actor.is_arbiter: - raise RuntimeError("The current actor is the arbiter") - - yield sockaddr if sockaddr else None + yield sockaddr @acm async def find_actor( name: str, - arbiter_sockaddr: tuple[str, int] | None = None + arbiter_sockaddr: tuple[str, int] | None = None, + registry_addrs: list[tuple[str, int]] | None = None, -) -> AsyncGenerator[Optional[Portal], None]: + only_first: bool = True, + +) -> AsyncGenerator[ + Portal | list[Portal] | None, + None, +]: ''' Ask the arbiter to find actor(s) by name. @@ -120,24 +166,54 @@ async def find_actor( known to the arbiter. ''' - async with query_actor( - name=name, - arbiter_sockaddr=arbiter_sockaddr, - ) as sockaddr: + if arbiter_sockaddr is not None: + warnings.warn( + '`tractor.find_actor(arbiter_sockaddr=)` is deprecated.\n' + 'Use `registry_addrs: list[tuple]` instead!', + DeprecationWarning, + stacklevel=2, + ) + registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr] - if sockaddr: - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal - else: + @acm + async def maybe_open_portal_from_reg_addr( + addr: tuple[str, int], + ): + async with query_actor( + name=name, + regaddr=addr, + ) as sockaddr: + if sockaddr: + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal + else: + yield None + + async with gather_contexts( + mngrs=list( + maybe_open_portal_from_reg_addr(addr) + for addr in registry_addrs + ) + ) as maybe_portals: + print(f'Portalz: {maybe_portals}') + if not maybe_portals: yield None + return + + portals: list[Portal] = list(maybe_portals) + if only_first: + yield portals[0] + + else: + yield portals @acm async def wait_for_actor( name: str, arbiter_sockaddr: tuple[str, int] | None = None, - # registry_addr: tuple[str, int] | None = None, + registry_addr: tuple[str, int] | None = None, ) -> AsyncGenerator[Portal, None]: ''' @@ -146,17 +222,33 @@ async def wait_for_actor( A portal to the first registered actor is returned. ''' - actor = current_actor() + actor: Actor = current_actor() - async with get_arbiter( - *arbiter_sockaddr or actor._arb_addr, - ) as arb_portal: - sockaddrs = await arb_portal.run_from_ns( + if arbiter_sockaddr is not None: + warnings.warn( + '`tractor.wait_for_actor(arbiter_sockaddr=)` is deprecated.\n' + 'Use `registry_addr: tuple` instead!', + DeprecationWarning, + stacklevel=2, + ) + registry_addr: list[tuple[str, int]] = [ + arbiter_sockaddr, + ] + + # TODO: use `.trionics.gather_contexts()` like + # above in `find_actor()` as well? + async with get_registry( + *(registry_addr or actor._reg_addrs[0]), # first if not passed + ) as reg_portal: + sockaddrs = await reg_portal.run_from_ns( 'self', 'wait_for_actor', name=name, ) - sockaddr = sockaddrs[-1] + + # get latest registered addr by default? + # TODO: offer multi-portal yields in multi-homed case? + sockaddr: tuple[str, int] = sockaddrs[-1] async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: diff --git a/tractor/_entry.py b/tractor/_entry.py index a59975ce..0ac0dc47 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -47,8 +47,8 @@ log = get_logger(__name__) def _mp_main( - actor: Actor, # type: ignore - accept_addr: tuple[str, int], + actor: Actor, + accept_addrs: list[tuple[str, int]], forkserver_info: tuple[Any, Any, Any, Any, Any], start_method: SpawnMethodKey, parent_addr: tuple[str, int] | None = None, @@ -77,8 +77,8 @@ def _mp_main( log.debug(f"parent_addr is {parent_addr}") trio_main = partial( async_main, - actor, - accept_addr, + actor=actor, + accept_addrs=accept_addrs, parent_addr=parent_addr ) try: @@ -96,7 +96,7 @@ def _mp_main( def _trio_main( - actor: Actor, # type: ignore + actor: Actor, *, parent_addr: tuple[str, int] | None = None, infect_asyncio: bool = False, diff --git a/tractor/_ipc.py b/tractor/_ipc.py index e80a1c35..f57d3bd8 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -517,7 +517,9 @@ class Channel: @acm async def _connect_chan( - host: str, port: int + host: str, + port: int + ) -> typing.AsyncGenerator[Channel, None]: ''' Create and connect a channel with disconnect on context manager diff --git a/tractor/_portal.py b/tractor/_portal.py index 0ca44483..ac602dd5 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -461,7 +461,12 @@ class LocalPortal: actor: 'Actor' # type: ignore # noqa channel: Channel - async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any: + async def run_from_ns( + self, + ns: str, + func_name: str, + **kwargs, + ) -> Any: ''' Run a requested local function from a namespace path and return it's result. diff --git a/tractor/_root.py b/tractor/_root.py index a1a11d3b..403907ec 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -59,10 +59,10 @@ async def open_root_actor( *, # defaults are above - arbiter_addr: tuple[str, int] | None = None, + registry_addrs: list[tuple[str, int]] | None = None, # defaults are above - registry_addr: tuple[str, int] | None = None, + arbiter_addr: tuple[str, int] | None = None, name: str | None = 'root', @@ -116,19 +116,19 @@ async def open_root_actor( if arbiter_addr is not None: warnings.warn( - '`arbiter_addr` is now deprecated and has been renamed to' - '`registry_addr`.\nUse that instead..', + '`arbiter_addr` is now deprecated\n' + 'Use `registry_addrs: list[tuple]` instead..', DeprecationWarning, stacklevel=2, ) + registry_addrs = [arbiter_addr] - registry_addr = (host, port) = ( - registry_addr - or arbiter_addr - or ( + registry_addrs: list[tuple[str, int]] = ( + registry_addrs + or [ # default on localhost _default_arbiter_host, _default_arbiter_port, - ) + ] ) loglevel = ( @@ -177,60 +177,105 @@ async def open_root_actor( '`stackscope` not installed for use in debug mode!' ) - try: - # make a temporary connection to see if an arbiter exists, - # if one can't be made quickly we assume none exists. - arbiter_found = False + # closed into below ping task-func + ponged_addrs: list[tuple[str, int]] = [] - # TODO: this connect-and-bail forces us to have to carefully - # rewrap TCP 104-connection-reset errors as EOF so as to avoid - # propagating cancel-causing errors to the channel-msg loop - # machinery. Likely it would be better to eventually have - # a "discovery" protocol with basic handshake instead. - with trio.move_on_after(1): - async with _connect_chan(host, port): - arbiter_found = True + async def ping_tpt_socket( + addr: tuple[str, int], + timeout: float = 1, + ) -> None: + ''' + Attempt temporary connection to see if a registry is + listening at the requested address by a tranport layer + ping. - except OSError: - # TODO: make this a "discovery" log level? - logger.warning(f"No actor registry found @ {host}:{port}") + If a connection can't be made quickly we assume none no + server is listening at that addr. - # create a local actor and start up its main routine/task - if arbiter_found: + ''' + try: + # TODO: this connect-and-bail forces us to have to + # carefully rewrap TCP 104-connection-reset errors as + # EOF so as to avoid propagating cancel-causing errors + # to the channel-msg loop machinery. Likely it would + # be better to eventually have a "discovery" protocol + # with basic handshake instead? + with trio.move_on_after(timeout): + async with _connect_chan(*addr): + ponged_addrs.append(addr) + + except OSError: + # TODO: make this a "discovery" log level? + logger.warning(f'No actor registry found @ {addr}') + + async with trio.open_nursery() as tn: + for addr in registry_addrs: + tn.start_soon(ping_tpt_socket, addr) + + trans_bind_addrs: list[tuple[str, int]] = [] + + # Create a new local root-actor instance which IS NOT THE + # REGISTRAR + if ponged_addrs: # we were able to connect to an arbiter - logger.info(f"Arbiter seems to exist @ {host}:{port}") + logger.info( + f'Registry(s) seem(s) to exist @ {ponged_addrs}' + ) actor = Actor( - name or 'anonymous', - arbiter_addr=registry_addr, + name=name or 'anonymous', + registry_addrs=ponged_addrs, loglevel=loglevel, enable_modules=enable_modules, ) - host, port = (host, 0) + # DO NOT use the registry_addrs as the transport server + # addrs for this new non-registar, root-actor. + for host, port in ponged_addrs: + # NOTE: zero triggers dynamic OS port allocation + trans_bind_addrs.append((host, 0)) + # Start this local actor as the "registrar", aka a regular + # actor who manages the local registry of "mailboxes" of + # other process-tree-local sub-actors. else: - # start this local actor as the arbiter (aka a regular actor who - # manages the local registry of "mailboxes") - # Note that if the current actor is the arbiter it is desirable - # for it to stay up indefinitely until a re-election process has - # taken place - which is not implemented yet FYI). + # NOTE that if the current actor IS THE REGISTAR, the + # following init steps are taken: + # - the tranport layer server is bound to each (host, port) + # pair defined in provided registry_addrs, or the default. + trans_bind_addrs = registry_addrs + + # - it is normally desirable for any registrar to stay up + # indefinitely until either all registered (child/sub) + # actors are terminated (via SC supervision) or, + # a re-election process has taken place. + # NOTE: all of ^ which is not implemented yet - see: + # https://github.com/goodboy/tractor/issues/216 + # https://github.com/goodboy/tractor/pull/348 + # https://github.com/goodboy/tractor/issues/296 actor = Arbiter( - name or 'arbiter', - arbiter_addr=registry_addr, + name or 'registrar', + registry_addrs=registry_addrs, loglevel=loglevel, enable_modules=enable_modules, ) + # Start up main task set via core actor-runtime nurseries. try: # assign process-local actor _state._current_actor = actor # start local channel-server and fake the portal API # NOTE: this won't block since we provide the nursery - logger.info(f"Starting local {actor} @ {host}:{port}") + ml_addrs_str: str = '\n'.join( + f'@{addr}' for addr in trans_bind_addrs + ) + logger.info( + f'Starting local {actor.uid} on the following transport addrs:\n' + f'{ml_addrs_str}' + ) # start the actor runtime in a new task async with trio.open_nursery() as nursery: @@ -243,7 +288,7 @@ async def open_root_actor( partial( async_main, actor, - accept_addr=(host, port), + accept_addrs=trans_bind_addrs, parent_addr=None ) ) @@ -255,7 +300,7 @@ async def open_root_actor( BaseExceptionGroup, ) as err: - entered = await _debug._maybe_enter_pm(err) + entered: bool = await _debug._maybe_enter_pm(err) if ( not entered and @@ -263,7 +308,8 @@ async def open_root_actor( ): logger.exception('Root actor crashed:\n') - # always re-raise + # ALWAYS re-raise any error bubbled up from the + # runtime! raise finally: @@ -284,7 +330,7 @@ async def open_root_actor( _state._current_actor = None _state._last_actor_terminated = actor - # restore breakpoint hook state + # restore built-in `breakpoint()` hook state sys.breakpointhook = builtin_bp_handler if orig_bp_path is not None: os.environ['PYTHONBREAKPOINT'] = orig_bp_path @@ -300,10 +346,9 @@ def run_daemon( # runtime kwargs name: str | None = 'root', - registry_addr: tuple[str, int] = ( - _default_arbiter_host, - _default_arbiter_port, - ), + registry_addrs: list[tuple[str, int]] = [ + (_default_arbiter_host, _default_arbiter_port) + ], start_method: str | None = None, debug_mode: bool = False, @@ -327,7 +372,7 @@ def run_daemon( async def _main(): async with open_root_actor( - registry_addr=registry_addr, + registry_addrs=registry_addrs, name=name, start_method=start_method, debug_mode=debug_mode, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 6b3d9461..d2a9e405 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -45,6 +45,7 @@ from functools import partial from itertools import chain import importlib import importlib.util +import os from pprint import pformat import signal import sys @@ -55,7 +56,7 @@ from typing import ( ) import uuid from types import ModuleType -import os +import warnings import trio from trio import ( @@ -77,8 +78,8 @@ from ._exceptions import ( ContextCancelled, TransportClosed, ) -from ._discovery import get_arbiter from .devx import _debug +from ._discovery import get_registry from ._portal import Portal from . import _state from . import _mp_fixup_main @@ -127,6 +128,11 @@ class Actor: # ugh, we need to get rid of this and replace with a "registry" sys # https://github.com/goodboy/tractor/issues/216 is_arbiter: bool = False + + @property + def is_registrar(self) -> bool: + return self.is_arbiter + msg_buffer_size: int = 2**6 # nursery placeholders filled in by `async_main()` after fork @@ -164,8 +170,12 @@ class Actor: enable_modules: list[str] = [], uid: str | None = None, loglevel: str | None = None, + registry_addrs: list[tuple[str, int]] | None = None, + spawn_method: str | None = None, + + # TODO: remove! arbiter_addr: tuple[str, int] | None = None, - spawn_method: str | None = None + ) -> None: ''' This constructor is called in the parent actor **before** the spawning @@ -189,27 +199,36 @@ class Actor: # always include debugging tools module enable_modules.append('tractor.devx._debug') - mods = {} + self.enable_modules: dict[str, str] = {} for name in enable_modules: - mod = importlib.import_module(name) - mods[name] = _get_mod_abspath(mod) + mod: ModuleType = importlib.import_module(name) + self.enable_modules[name] = _get_mod_abspath(mod) - self.enable_modules = mods self._mods: dict[str, ModuleType] = {} - self.loglevel = loglevel + self.loglevel: str = loglevel - self._arb_addr: tuple[str, int] | None = ( - str(arbiter_addr[0]), - int(arbiter_addr[1]) - ) if arbiter_addr else None + if arbiter_addr is not None: + warnings.warn( + '`Actor(arbiter_addr=)` is now deprecated.\n' + 'Use `registry_addrs: list[tuple]` instead.', + DeprecationWarning, + stacklevel=2, + ) + registry_addrs: list[tuple[str, int]] = [arbiter_addr] + + self._reg_addrs: list[tuple[str, int]] = ( + registry_addrs + or + None + ) # marked by the process spawning backend at startup # will be None for the parent most process started manually # by the user (currently called the "arbiter") - self._spawn_method = spawn_method + self._spawn_method: str = spawn_method self._peers: defaultdict = defaultdict(list) - self._peer_connected: dict = {} + self._peer_connected: dict[tuple[str, str], trio.Event] = {} self._no_more_peers = trio.Event() self._no_more_peers.set() self._ongoing_rpc_tasks = trio.Event() @@ -336,6 +355,12 @@ class Actor: self._no_more_peers = trio.Event() # unset by making new chan = Channel.from_stream(stream) their_uid: tuple[str, str]|None = chan.uid + if their_uid: + log.warning( + f'Re-connection from already known {their_uid}' + ) + else: + log.runtime(f'New connection to us @{chan.raddr}') con_msg: str = '' if their_uid: @@ -880,11 +905,11 @@ class Actor: ) await chan.connect() + # TODO: move this into a `Channel.handshake()`? # Initial handshake: swap names. await self._do_handshake(chan) - accept_addr: tuple[str, int] | None = None - + accept_addrs: list[tuple[str, int]] | None = None if self._spawn_method == "trio": # Receive runtime state from our parent parent_data: dict[str, Any] @@ -897,10 +922,7 @@ class Actor: # if "trace"/"util" mode is enabled? f'{pformat(parent_data)}\n' ) - accept_addr = ( - parent_data.pop('bind_host'), - parent_data.pop('bind_port'), - ) + accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs') rvs = parent_data.pop('_runtime_vars') if rvs['_debug_mode']: @@ -919,17 +941,18 @@ class Actor: for attr, value in parent_data.items(): - if attr == '_arb_addr': + if attr == '_reg_addrs': # XXX: ``msgspec`` doesn't support serializing tuples # so just cash manually here since it's what our # internals expect. - value = tuple(value) if value else None - self._arb_addr = value + self._reg_addrs = [ + tuple(val) for val in value + ] if value else None else: setattr(self, attr, value) - return chan, accept_addr + return chan, accept_addrs except OSError: # failed to connect log.warning( @@ -946,8 +969,8 @@ class Actor: handler_nursery: Nursery, *, # (host, port) to bind for channel server - accept_host: tuple[str, int] | None = None, - accept_port: int = 0, + listen_sockaddrs: list[tuple[str, int]] | None = None, + task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -958,30 +981,40 @@ class Actor: `.cancel_server()` is called. ''' + if listen_sockaddrs is None: + listen_sockaddrs = [(None, 0)] + self._server_down = trio.Event() try: async with trio.open_nursery() as server_n: - listeners: list[trio.abc.Listener] = await server_n.start( - partial( - trio.serve_tcp, - self._stream_handler, - # new connections will stay alive even if this server - # is cancelled - handler_nursery=handler_nursery, - port=accept_port, - host=accept_host, + + for host, port in listen_sockaddrs: + listeners: list[trio.abc.Listener] = await server_n.start( + partial( + trio.serve_tcp, + + handler=self._stream_handler, + port=port, + host=host, + + # NOTE: configured such that new + # connections will stay alive even if + # this server is cancelled! + handler_nursery=handler_nursery, + ) ) - ) - sockets: list[trio.socket] = [ - getattr(listener, 'socket', 'unknown socket') - for listener in listeners - ] - log.runtime( - 'Started TCP server(s)\n' - f'|_{sockets}\n' - ) - self._listeners.extend(listeners) + sockets: list[trio.socket] = [ + getattr(listener, 'socket', 'unknown socket') + for listener in listeners + ] + log.runtime( + 'Started TCP server(s)\n' + f'|_{sockets}\n' + ) + self._listeners.extend(listeners) + task_status.started(server_n) + finally: # signal the server is down since nursery above terminated self._server_down.set() @@ -1318,6 +1351,19 @@ class Actor: log.runtime("Shutting down channel server") self._server_n.cancel_scope.cancel() + @property + def accept_addrs(self) -> list[tuple[str, int]]: + ''' + All addresses to which the transport-channel server binds + and listens for new connections. + + ''' + # throws OSError on failure + return [ + listener.socket.getsockname() + for listener in self._listeners + ] # type: ignore + @property def accept_addr(self) -> tuple[str, int]: ''' @@ -1326,7 +1372,7 @@ class Actor: ''' # throws OSError on failure - return self._listeners[0].socket.getsockname() # type: ignore + return self.accept_addrs[0] def get_parent(self) -> Portal: ''' @@ -1343,6 +1389,7 @@ class Actor: ''' return self._peers[uid] + # TODO: move to `Channel.handshake(uid)` async def _do_handshake( self, chan: Channel @@ -1379,7 +1426,7 @@ class Actor: async def async_main( actor: Actor, - accept_addr: tuple[str, int] | None = None, + accept_addrs: tuple[str, int] | None = None, # XXX: currently ``parent_addr`` is only needed for the # ``multiprocessing`` backend (which pickles state sent to @@ -1407,20 +1454,25 @@ async def async_main( # on our debugger lock state. _debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) - registered_with_arbiter = False + is_registered: bool = False try: # establish primary connection with immediate parent - actor._parent_chan = None + actor._parent_chan: Channel | None = None if parent_addr is not None: - actor._parent_chan, accept_addr_rent = await actor._from_parent( - parent_addr) + ( + actor._parent_chan, + set_accept_addr_says_rent, + ) = await actor._from_parent(parent_addr) - # either it's passed in because we're not a child - # or because we're running in mp mode - if accept_addr_rent is not None: - accept_addr = accept_addr_rent + # either it's passed in because we're not a child or + # because we're running in mp mode + if ( + set_accept_addr_says_rent + and set_accept_addr_says_rent is not None + ): + accept_addrs = set_accept_addr_says_rent # The "root" nursery ensures the channel with the immediate # parent is kept alive as a resilient service until @@ -1460,34 +1512,58 @@ async def async_main( # - subactor: the bind address is sent by our parent # over our established channel # - root actor: the ``accept_addr`` passed to this method - assert accept_addr - host, port = accept_addr + assert accept_addrs actor._server_n = await service_nursery.start( partial( actor._serve_forever, service_nursery, - accept_host=host, - accept_port=port + listen_sockaddrs=accept_addrs, ) ) - accept_addr = actor.accept_addr + accept_addrs: list[tuple[str, int]] = actor.accept_addrs + + # NOTE: only set the loopback addr for the + # process-tree-global "root" mailbox since + # all sub-actors should be able to speak to + # their root actor over that channel. if _state._runtime_vars['_is_root']: - _state._runtime_vars['_root_mailbox'] = accept_addr + for addr in accept_addrs: + host, _ = addr + # TODO: generic 'lo' detector predicate + if '127.0.0.1' in host: + _state._runtime_vars['_root_mailbox'] = addr # Register with the arbiter if we're told its addr - log.runtime(f"Registering {actor} for role `{actor.name}`") - assert isinstance(actor._arb_addr, tuple) + log.runtime( + f'Registering `{actor.name}` ->\n' + f'{pformat(accept_addrs)}' + ) - async with get_arbiter(*actor._arb_addr) as arb_portal: - await arb_portal.run_from_ns( - 'self', - 'register_actor', - uid=actor.uid, - sockaddr=accept_addr, - ) + # TODO: ideally we don't fan out to all registrars + # if addresses point to the same actor.. + # So we need a way to detect that? maybe iterate + # only on unique actor uids? + for addr in actor._reg_addrs: + assert isinstance(addr, tuple) + assert addr[1] # non-zero after bind - registered_with_arbiter = True + async with get_registry(*addr) as reg_portal: + for accept_addr in accept_addrs: + + if not accept_addr[1]: + await _debug.pause() + + assert accept_addr[1] + + await reg_portal.run_from_ns( + 'self', + 'register_actor', + uid=actor.uid, + sockaddr=accept_addr, + ) + + is_registered: bool = True # init steps complete task_status.started() @@ -1520,18 +1596,18 @@ async def async_main( log.runtime("Closing all actor lifetime contexts") actor.lifetime_stack.close() - if not registered_with_arbiter: + if not is_registered: # TODO: I guess we could try to connect back # to the parent through a channel and engage a debugger # once we have that all working with std streams locking? log.exception( f"Actor errored and failed to register with arbiter " - f"@ {actor._arb_addr}?") + f"@ {actor._reg_addrs[0]}?") log.error( "\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n" "\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n" - "\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n" - "\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n" + "\tIf this is a sub-actor likely its parent will keep running " + "\tcorrectly if this error is caught and ignored.." ) if actor._parent_chan: @@ -1571,27 +1647,33 @@ async def async_main( # Unregister actor from the registry-sys / registrar. if ( - registered_with_arbiter - and not actor.is_arbiter + is_registered + and not actor.is_registrar ): - failed = False - assert isinstance(actor._arb_addr, tuple) - with trio.move_on_after(0.5) as cs: - cs.shield = True - try: - async with get_arbiter(*actor._arb_addr) as arb_portal: - await arb_portal.run_from_ns( - 'self', - 'unregister_actor', - uid=actor.uid - ) - except OSError: + failed: bool = False + for addr in actor._reg_addrs: + assert isinstance(addr, tuple) + with trio.move_on_after(0.5) as cs: + cs.shield = True + try: + async with get_registry( + *addr, + ) as reg_portal: + await reg_portal.run_from_ns( + 'self', + 'unregister_actor', + uid=actor.uid + ) + except OSError: + failed = True + if cs.cancelled_caught: failed = True - if cs.cancelled_caught: - failed = True - if failed: - log.warning( - f"Failed to unregister {actor.name} from arbiter") + + if failed: + log.warning( + f'Failed to unregister {actor.name} from ' + f'registar @ {addr}' + ) # Ensure all peers (actors connected to us as clients) are finished if not actor._no_more_peers.is_set(): @@ -1610,18 +1692,36 @@ async def async_main( # TODO: rename to `Registry` and move to `._discovery`! class Arbiter(Actor): ''' - A special actor who knows all the other actors and always has - access to a top level nursery. + A special registrar actor who can contact all other actors + within its immediate process tree and possibly keeps a registry + of others meant to be discoverable in a distributed + application. Normally the registrar is also the "root actor" + and thus always has access to the top-most-level actor + (process) nursery. - The arbiter is by default the first actor spawned on each host - and is responsible for keeping track of all other actors for - coordination purposes. If a new main process is launched and an - arbiter is already running that arbiter will be used. + By default, the registrar is always initialized when and if no + other registrar socket addrs have been specified to runtime + init entry-points (such as `open_root_actor()` or + `open_nursery()`). Any time a new main process is launched (and + thus thus a new root actor created) and, no existing registrar + can be contacted at the provided `registry_addr`, then a new + one is always created; however, if one can be reached it is + used. + + Normally a distributed app requires at least registrar per + logical host where for that given "host space" (aka localhost + IPC domain of addresses) it is responsible for making all other + host (local address) bound actors *discoverable* to external + actor trees running on remote hosts. ''' is_arbiter = True - def __init__(self, *args, **kwargs) -> None: + def __init__( + self, + *args, + **kwargs, + ) -> None: self._registry: dict[ tuple[str, str], @@ -1663,7 +1763,10 @@ class Arbiter(Actor): # unpacker since we have tuples as keys (not this makes the # arbiter suscetible to hashdos): # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 - return {'.'.join(key): val for key, val in self._registry.items()} + return { + '.'.join(key): val + for key, val in self._registry.items() + } async def wait_for_actor( self, @@ -1706,8 +1809,15 @@ class Arbiter(Actor): sockaddr: tuple[str, int] ) -> None: - uid = name, _ = (str(uid[0]), str(uid[1])) - self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1])) + uid = name, hash = (str(uid[0]), str(uid[1])) + addr = (host, port) = ( + str(sockaddr[0]), + int(sockaddr[1]), + ) + if port == 0: + await _debug.pause() + assert port # should never be 0-dynamic-os-alloc + self._registry[uid] = addr # pop and signal all waiter events events = self._waiters.pop(name, []) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 78c38c84..001a0f10 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -365,7 +365,7 @@ async def new_proc( errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addr: tuple[str, int], + bind_addrs: list[tuple[str, int]], parent_addr: tuple[str, int], _runtime_vars: dict[str, Any], # serialized and sent to _child @@ -387,7 +387,7 @@ async def new_proc( actor_nursery, subactor, errors, - bind_addr, + bind_addrs, parent_addr, _runtime_vars, # run time vars infect_asyncio=infect_asyncio, @@ -402,7 +402,7 @@ async def trio_proc( errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addr: tuple[str, int], + bind_addrs: list[tuple[str, int]], parent_addr: tuple[str, int], _runtime_vars: dict[str, Any], # serialized and sent to _child *, @@ -491,12 +491,11 @@ async def trio_proc( # send additional init params await chan.send({ - "_parent_main_data": subactor._parent_main_data, - "enable_modules": subactor.enable_modules, - "_arb_addr": subactor._arb_addr, - "bind_host": bind_addr[0], - "bind_port": bind_addr[1], - "_runtime_vars": _runtime_vars, + '_parent_main_data': subactor._parent_main_data, + 'enable_modules': subactor.enable_modules, + '_reg_addrs': subactor._reg_addrs, + 'bind_addrs': bind_addrs, + '_runtime_vars': _runtime_vars, }) # track subactor in current nursery @@ -602,7 +601,7 @@ async def mp_proc( subactor: Actor, errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addr: tuple[str, int], + bind_addrs: list[tuple[str, int]], parent_addr: tuple[str, int], _runtime_vars: dict[str, Any], # serialized and sent to _child *, @@ -660,7 +659,7 @@ async def mp_proc( target=_mp_main, args=( subactor, - bind_addr, + bind_addrs, fs_info, _spawn_method, parent_addr, diff --git a/tractor/_supervise.py b/tractor/_supervise.py index c8c2336d..615ba692 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -22,10 +22,7 @@ from contextlib import asynccontextmanager as acm from functools import partial import inspect from pprint import pformat -from typing import ( - Optional, - TYPE_CHECKING, -) +from typing import TYPE_CHECKING import typing import warnings @@ -97,7 +94,7 @@ class ActorNursery: tuple[ Actor, trio.Process | mp.Process, - Optional[Portal], + Portal | None, ] ] = {} # portals spawned with ``run_in_actor()`` are @@ -121,12 +118,12 @@ class ActorNursery: self, name: str, *, - bind_addr: tuple[str, int] = _default_bind_addr, + bind_addrs: list[tuple[str, int]] = [_default_bind_addr], rpc_module_paths: list[str] | None = None, enable_modules: list[str] | None = None, loglevel: str | None = None, # set log level per subactor nursery: trio.Nursery | None = None, - debug_mode: Optional[bool] | None = None, + debug_mode: bool | None = None, infect_asyncio: bool = False, ) -> Portal: ''' @@ -161,7 +158,9 @@ class ActorNursery: # modules allowed to invoked funcs from enable_modules=enable_modules, loglevel=loglevel, - arbiter_addr=current_actor()._arb_addr, + + # verbatim relay this actor's registrar addresses + registry_addrs=current_actor()._reg_addrs, ) parent_addr = self._actor.accept_addr assert parent_addr @@ -178,7 +177,7 @@ class ActorNursery: self, subactor, self.errors, - bind_addr, + bind_addrs, parent_addr, _rtv, # run time vars infect_asyncio=infect_asyncio, @@ -191,8 +190,8 @@ class ActorNursery: fn: typing.Callable, *, - name: Optional[str] = None, - bind_addr: tuple[str, int] = _default_bind_addr, + name: str | None = None, + bind_addrs: tuple[str, int] = [_default_bind_addr], rpc_module_paths: list[str] | None = None, enable_modules: list[str] | None = None, loglevel: str | None = None, # set log level per subactor @@ -221,7 +220,7 @@ class ActorNursery: enable_modules=[mod_path] + ( enable_modules or rpc_module_paths or [] ), - bind_addr=bind_addr, + bind_addrs=bind_addrs, loglevel=loglevel, # use the run_in_actor nursery nursery=self._ria_nursery,