diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 03775ac..22ab88d 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -15,16 +15,19 @@ # along with this program. If not, see . """ -Actor discovery API. +Discovery (protocols) API for automatic addressing and location +management of (service) actors. """ +from __future__ import annotations from typing import ( - Optional, - Union, AsyncGenerator, + TYPE_CHECKING, ) from contextlib import asynccontextmanager as acm +import warnings +from .trionics import gather_contexts from ._ipc import _connect_chan, Channel from ._portal import ( Portal, @@ -34,13 +37,19 @@ from ._portal import ( from ._state import current_actor, _runtime_vars -@acm -async def get_arbiter( +if TYPE_CHECKING: + from ._runtime import Actor + +@acm +async def get_registry( host: str, port: int, -) -> AsyncGenerator[Union[Portal, LocalPortal], None]: +) -> AsyncGenerator[ + Portal | LocalPortal | None, + None, +]: ''' Return a portal instance connected to a local or remote arbiter. @@ -51,16 +60,23 @@ async def get_arbiter( if not actor: raise RuntimeError("No actor instance has been defined yet?") - if actor.is_arbiter: + if actor.is_registrar: # we're already the arbiter # (likely a re-entrant call from the arbiter actor) - yield LocalPortal(actor, Channel((host, port))) + yield LocalPortal( + actor, + Channel((host, port)) + ) else: - async with _connect_chan(host, port) as chan: + async with ( + _connect_chan(host, port) as chan, + open_portal(chan) as regstr_ptl, + ): + yield regstr_ptl - async with open_portal(chan) as arb_portal: - yield arb_portal +# TODO: deprecate and remove _arbiter form +get_arbiter = get_registry @acm @@ -68,51 +84,81 @@ async def get_root( **kwargs, ) -> AsyncGenerator[Portal, None]: + # TODO: rename mailbox to `_root_maddr` when we finally + # add and impl libp2p multi-addrs? host, port = _runtime_vars['_root_mailbox'] assert host is not None - async with _connect_chan(host, port) as chan: - async with open_portal(chan, **kwargs) as portal: - yield portal + async with ( + _connect_chan(host, port) as chan, + open_portal(chan, **kwargs) as portal, + ): + yield portal @acm async def query_actor( name: str, - arbiter_sockaddr: Optional[tuple[str, int]] = None, + arbiter_sockaddr: tuple[str, int] | None = None, + regaddr: tuple[str, int] | None = None, -) -> AsyncGenerator[tuple[str, int], None]: +) -> AsyncGenerator[ + tuple[str, int] | None, + None, +]: ''' - Simple address lookup for a given actor name. + Make a transport address lookup for an actor name to a specific + registrar. - Returns the (socket) address or ``None``. + Returns the (socket) address or ``None`` if no entry under that + name exists for the given registrar listening @ `regaddr`. ''' - actor = current_actor() - async with get_arbiter( - *arbiter_sockaddr or actor._arb_addr - ) as arb_portal: + actor: Actor = current_actor() + if ( + name == 'registrar' + and actor.is_registrar + ): + raise RuntimeError( + 'The current actor IS the registry!?' + ) - sockaddr = await arb_portal.run_from_ns( + if arbiter_sockaddr is not None: + warnings.warn( + '`tractor.query_actor(regaddr=)` is deprecated.\n' + 'Use `registry_addrs: list[tuple]` instead!', + DeprecationWarning, + stacklevel=2, + ) + regaddr: list[tuple[str, int]] = arbiter_sockaddr + + regstr: Portal + async with get_registry( + *(regaddr or actor._reg_addrs[0]) + ) as regstr: + + # TODO: return portals to all available actors - for now + # just the last one that registered + sockaddr: tuple[str, int] = await regstr.run_from_ns( 'self', 'find_actor', name=name, ) - - # TODO: return portals to all available actors - for now just - # the last one that registered - if name == 'arbiter' and actor.is_arbiter: - raise RuntimeError("The current actor is the arbiter") - - yield sockaddr if sockaddr else None + yield sockaddr @acm async def find_actor( name: str, - arbiter_sockaddr: tuple[str, int] | None = None + arbiter_sockaddr: tuple[str, int] | None = None, + registry_addrs: list[tuple[str, int]] | None = None, -) -> AsyncGenerator[Optional[Portal], None]: + only_first: bool = True, + +) -> AsyncGenerator[ + Portal | list[Portal] | None, + None, +]: ''' Ask the arbiter to find actor(s) by name. @@ -120,24 +166,54 @@ async def find_actor( known to the arbiter. ''' - async with query_actor( - name=name, - arbiter_sockaddr=arbiter_sockaddr, - ) as sockaddr: + if arbiter_sockaddr is not None: + warnings.warn( + '`tractor.find_actor(arbiter_sockaddr=)` is deprecated.\n' + 'Use `registry_addrs: list[tuple]` instead!', + DeprecationWarning, + stacklevel=2, + ) + registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr] - if sockaddr: - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal - else: + @acm + async def maybe_open_portal_from_reg_addr( + addr: tuple[str, int], + ): + async with query_actor( + name=name, + regaddr=addr, + ) as sockaddr: + if sockaddr: + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal + else: + yield None + + async with gather_contexts( + mngrs=list( + maybe_open_portal_from_reg_addr(addr) + for addr in registry_addrs + ) + ) as maybe_portals: + print(f'Portalz: {maybe_portals}') + if not maybe_portals: yield None + return + + portals: list[Portal] = list(maybe_portals) + if only_first: + yield portals[0] + + else: + yield portals @acm async def wait_for_actor( name: str, arbiter_sockaddr: tuple[str, int] | None = None, - # registry_addr: tuple[str, int] | None = None, + registry_addr: tuple[str, int] | None = None, ) -> AsyncGenerator[Portal, None]: ''' @@ -146,17 +222,33 @@ async def wait_for_actor( A portal to the first registered actor is returned. ''' - actor = current_actor() + actor: Actor = current_actor() - async with get_arbiter( - *arbiter_sockaddr or actor._arb_addr, - ) as arb_portal: - sockaddrs = await arb_portal.run_from_ns( + if arbiter_sockaddr is not None: + warnings.warn( + '`tractor.wait_for_actor(arbiter_sockaddr=)` is deprecated.\n' + 'Use `registry_addr: tuple` instead!', + DeprecationWarning, + stacklevel=2, + ) + registry_addr: list[tuple[str, int]] = [ + arbiter_sockaddr, + ] + + # TODO: use `.trionics.gather_contexts()` like + # above in `find_actor()` as well? + async with get_registry( + *(registry_addr or actor._reg_addrs[0]), # first if not passed + ) as reg_portal: + sockaddrs = await reg_portal.run_from_ns( 'self', 'wait_for_actor', name=name, ) - sockaddr = sockaddrs[-1] + + # get latest registered addr by default? + # TODO: offer multi-portal yields in multi-homed case? + sockaddr: tuple[str, int] = sockaddrs[-1] async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: diff --git a/tractor/_entry.py b/tractor/_entry.py index e8fb56d..b5ab405 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -47,8 +47,8 @@ log = get_logger(__name__) def _mp_main( - actor: Actor, # type: ignore - accept_addr: tuple[str, int], + actor: Actor, + accept_addrs: list[tuple[str, int]], forkserver_info: tuple[Any, Any, Any, Any, Any], start_method: SpawnMethodKey, parent_addr: tuple[str, int] | None = None, @@ -77,8 +77,8 @@ def _mp_main( log.debug(f"parent_addr is {parent_addr}") trio_main = partial( async_main, - actor, - accept_addr, + actor=actor, + accept_addrs=accept_addrs, parent_addr=parent_addr ) try: @@ -96,7 +96,7 @@ def _mp_main( def _trio_main( - actor: Actor, # type: ignore + actor: Actor, *, parent_addr: tuple[str, int] | None = None, infect_asyncio: bool = False, @@ -132,7 +132,9 @@ def _trio_main( else: trio.run(trio_main) except KeyboardInterrupt: - log.cancel(f"Actor {actor.uid} received KBI") + log.cancel( + f'Actor@{actor.uid} received KBI' + ) finally: log.info(f"Actor {actor.uid} terminated") diff --git a/tractor/_ipc.py b/tractor/_ipc.py index ebfd261..a022908 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -467,7 +467,9 @@ class Channel: @asynccontextmanager async def _connect_chan( - host: str, port: int + host: str, + port: int + ) -> typing.AsyncGenerator[Channel, None]: ''' Create and connect a channel with disconnect on context manager diff --git a/tractor/_portal.py b/tractor/_portal.py index 6029371..53684b4 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -586,7 +586,12 @@ class LocalPortal: actor: 'Actor' # type: ignore # noqa channel: Channel - async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any: + async def run_from_ns( + self, + ns: str, + func_name: str, + **kwargs, + ) -> Any: ''' Run a requested local function from a namespace path and return it's result. diff --git a/tractor/_root.py b/tractor/_root.py index a19652d..f64aa69 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -58,10 +58,10 @@ async def open_root_actor( *, # defaults are above - arbiter_addr: tuple[str, int] | None = None, + registry_addrs: list[tuple[str, int]] | None = None, # defaults are above - registry_addr: tuple[str, int] | None = None, + arbiter_addr: tuple[str, int] | None = None, name: str | None = 'root', @@ -115,19 +115,19 @@ async def open_root_actor( if arbiter_addr is not None: warnings.warn( - '`arbiter_addr` is now deprecated and has been renamed to' - '`registry_addr`.\nUse that instead..', + '`arbiter_addr` is now deprecated\n' + 'Use `registry_addrs: list[tuple]` instead..', DeprecationWarning, stacklevel=2, ) + registry_addrs = [arbiter_addr] - registry_addr = (host, port) = ( - registry_addr - or arbiter_addr - or ( + registry_addrs: list[tuple[str, int]] = ( + registry_addrs + or [ # default on localhost _default_arbiter_host, _default_arbiter_port, - ) + ] ) loglevel = (loglevel or log._default_loglevel).upper() @@ -157,60 +157,105 @@ async def open_root_actor( log.get_console_log(loglevel) - try: - # make a temporary connection to see if an arbiter exists, - # if one can't be made quickly we assume none exists. - arbiter_found = False + # closed into below ping task-func + ponged_addrs: list[tuple[str, int]] = [] - # TODO: this connect-and-bail forces us to have to carefully - # rewrap TCP 104-connection-reset errors as EOF so as to avoid - # propagating cancel-causing errors to the channel-msg loop - # machinery. Likely it would be better to eventually have - # a "discovery" protocol with basic handshake instead. - with trio.move_on_after(1): - async with _connect_chan(host, port): - arbiter_found = True + async def ping_tpt_socket( + addr: tuple[str, int], + timeout: float = 1, + ) -> None: + ''' + Attempt temporary connection to see if a registry is + listening at the requested address by a tranport layer + ping. - except OSError: - # TODO: make this a "discovery" log level? - logger.warning(f"No actor registry found @ {host}:{port}") + If a connection can't be made quickly we assume none no + server is listening at that addr. - # create a local actor and start up its main routine/task - if arbiter_found: + ''' + try: + # TODO: this connect-and-bail forces us to have to + # carefully rewrap TCP 104-connection-reset errors as + # EOF so as to avoid propagating cancel-causing errors + # to the channel-msg loop machinery. Likely it would + # be better to eventually have a "discovery" protocol + # with basic handshake instead? + with trio.move_on_after(timeout): + async with _connect_chan(*addr): + ponged_addrs.append(addr) + + except OSError: + # TODO: make this a "discovery" log level? + logger.warning(f'No actor registry found @ {addr}') + + async with trio.open_nursery() as tn: + for addr in registry_addrs: + tn.start_soon(ping_tpt_socket, addr) + + trans_bind_addrs: list[tuple[str, int]] = [] + + # Create a new local root-actor instance which IS NOT THE + # REGISTRAR + if ponged_addrs: # we were able to connect to an arbiter - logger.info(f"Arbiter seems to exist @ {host}:{port}") + logger.info( + f'Registry(s) seem(s) to exist @ {ponged_addrs}' + ) actor = Actor( - name or 'anonymous', - arbiter_addr=registry_addr, + name=name or 'anonymous', + registry_addrs=ponged_addrs, loglevel=loglevel, enable_modules=enable_modules, ) - host, port = (host, 0) + # DO NOT use the registry_addrs as the transport server + # addrs for this new non-registar, root-actor. + for host, port in ponged_addrs: + # NOTE: zero triggers dynamic OS port allocation + trans_bind_addrs.append((host, 0)) + # Start this local actor as the "registrar", aka a regular + # actor who manages the local registry of "mailboxes" of + # other process-tree-local sub-actors. else: - # start this local actor as the arbiter (aka a regular actor who - # manages the local registry of "mailboxes") - # Note that if the current actor is the arbiter it is desirable - # for it to stay up indefinitely until a re-election process has - # taken place - which is not implemented yet FYI). + # NOTE that if the current actor IS THE REGISTAR, the + # following init steps are taken: + # - the tranport layer server is bound to each (host, port) + # pair defined in provided registry_addrs, or the default. + trans_bind_addrs = registry_addrs + + # - it is normally desirable for any registrar to stay up + # indefinitely until either all registered (child/sub) + # actors are terminated (via SC supervision) or, + # a re-election process has taken place. + # NOTE: all of ^ which is not implemented yet - see: + # https://github.com/goodboy/tractor/issues/216 + # https://github.com/goodboy/tractor/pull/348 + # https://github.com/goodboy/tractor/issues/296 actor = Arbiter( - name or 'arbiter', - arbiter_addr=registry_addr, + name or 'registrar', + registry_addrs=registry_addrs, loglevel=loglevel, enable_modules=enable_modules, ) + # Start up main task set via core actor-runtime nurseries. try: # assign process-local actor _state._current_actor = actor # start local channel-server and fake the portal API # NOTE: this won't block since we provide the nursery - logger.info(f"Starting local {actor} @ {host}:{port}") + ml_addrs_str: str = '\n'.join( + f'@{addr}' for addr in trans_bind_addrs + ) + logger.info( + f'Starting local {actor.uid} on the following transport addrs:\n' + f'{ml_addrs_str}' + ) # start the actor runtime in a new task async with trio.open_nursery() as nursery: @@ -223,7 +268,7 @@ async def open_root_actor( partial( async_main, actor, - accept_addr=(host, port), + accept_addrs=trans_bind_addrs, parent_addr=None ) ) @@ -235,13 +280,16 @@ async def open_root_actor( BaseExceptionGroup, ) as err: + entered: bool = await _debug._maybe_enter_pm(err) + if ( - not (await _debug._maybe_enter_pm(err)) + not entered and not is_multi_cancelled(err) ): logger.exception("Root actor crashed:") - # always re-raise + # ALWAYS re-raise any error bubbled up from the + # runtime! raise finally: @@ -261,7 +309,7 @@ async def open_root_actor( finally: _state._current_actor = None - # restore breakpoint hook state + # restore built-in `breakpoint()` hook state sys.breakpointhook = builtin_bp_handler if orig_bp_path is not None: os.environ['PYTHONBREAKPOINT'] = orig_bp_path @@ -277,10 +325,9 @@ def run_daemon( # runtime kwargs name: str | None = 'root', - registry_addr: tuple[str, int] = ( - _default_arbiter_host, - _default_arbiter_port, - ), + registry_addrs: list[tuple[str, int]] = [ + (_default_arbiter_host, _default_arbiter_port) + ], start_method: str | None = None, debug_mode: bool = False, @@ -304,7 +351,7 @@ def run_daemon( async def _main(): async with open_root_actor( - registry_addr=registry_addr, + registry_addrs=registry_addrs, name=name, start_method=start_method, debug_mode=debug_mode, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index c9e4bfe..16f105c 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -25,12 +25,12 @@ from itertools import chain import importlib import importlib.util import inspect +from pprint import pformat import signal import sys from typing import ( Any, Callable, - Union, TYPE_CHECKING, ) import uuid @@ -59,7 +59,7 @@ from ._exceptions import ( TransportClosed, ) from . import _debug -from ._discovery import get_arbiter +from ._discovery import get_registry from ._portal import Portal from . import _state from . import _mp_fixup_main @@ -82,7 +82,7 @@ async def _invoke( is_rpc: bool = True, task_status: TaskStatus[ - Union[Context, BaseException] + Context | BaseException ] = trio.TASK_STATUS_IGNORED, ): ''' @@ -96,8 +96,14 @@ async def _invoke( failed_resp: bool = False if _state.debug_mode(): - import greenback - await greenback.ensure_portal() + try: + import greenback + await greenback.ensure_portal() + except ModuleNotFoundError: + log.warning( + '`greenback` is not installed.\n' + 'No sync debug support!' + ) # possibly a traceback (not sure what typing is for this..) tb = None @@ -416,13 +422,13 @@ async def _invoke( actor._ongoing_rpc_tasks.set() -def _get_mod_abspath(module): +def _get_mod_abspath(module: ModuleType) -> str: return os.path.abspath(module.__file__) async def try_ship_error_to_parent( channel: Channel, - err: Union[Exception, BaseExceptionGroup], + err: Exception | BaseExceptionGroup, ) -> None: with trio.CancelScope(shield=True): @@ -469,6 +475,11 @@ class Actor: # ugh, we need to get rid of this and replace with a "registry" sys # https://github.com/goodboy/tractor/issues/216 is_arbiter: bool = False + + @property + def is_registrar(self) -> bool: + return self.is_arbiter + msg_buffer_size: int = 2**6 # nursery placeholders filled in by `async_main()` after fork @@ -501,8 +512,12 @@ class Actor: enable_modules: list[str] = [], uid: str | None = None, loglevel: str | None = None, + registry_addrs: list[tuple[str, int]] | None = None, + spawn_method: str | None = None, + + # TODO: remove! arbiter_addr: tuple[str, int] | None = None, - spawn_method: str | None = None + ) -> None: ''' This constructor is called in the parent actor **before** the spawning @@ -523,27 +538,36 @@ class Actor: # always include debugging tools module enable_modules.append('tractor._debug') - mods = {} + self.enable_modules: dict[str, str] = {} for name in enable_modules: - mod = importlib.import_module(name) - mods[name] = _get_mod_abspath(mod) + mod: ModuleType = importlib.import_module(name) + self.enable_modules[name] = _get_mod_abspath(mod) - self.enable_modules = mods self._mods: dict[str, ModuleType] = {} - self.loglevel = loglevel + self.loglevel: str = loglevel - self._arb_addr: tuple[str, int] | None = ( - str(arbiter_addr[0]), - int(arbiter_addr[1]) - ) if arbiter_addr else None + if arbiter_addr is not None: + warnings.warn( + '`Actor(arbiter_addr=)` is now deprecated.\n' + 'Use `registry_addrs: list[tuple]` instead.', + DeprecationWarning, + stacklevel=2, + ) + registry_addrs: list[tuple[str, int]] = [arbiter_addr] + + self._reg_addrs: list[tuple[str, int]] = ( + registry_addrs + or + None + ) # marked by the process spawning backend at startup # will be None for the parent most process started manually # by the user (currently called the "arbiter") - self._spawn_method = spawn_method + self._spawn_method: str = spawn_method self._peers: defaultdict = defaultdict(list) - self._peer_connected: dict = {} + self._peer_connected: dict[tuple[str, str], trio.Event] = {} self._no_more_peers = trio.Event() self._no_more_peers.set() self._ongoing_rpc_tasks = trio.Event() @@ -654,13 +678,17 @@ class Actor: self._no_more_peers = trio.Event() # unset chan = Channel.from_stream(stream) - uid: tuple[str, str] | None = chan.uid - log.runtime(f"New connection to us {chan}") + their_uid: tuple[str, str] | None = chan.uid + if their_uid: + log.warning( + f'Re-connection from already known {their_uid}' + ) + else: + log.runtime(f'New connection to us @{chan.raddr}') # send/receive initial handshake response try: uid = await self._do_handshake(chan) - except ( # we need this for ``msgspec`` for some reason? # for now, it's been put in the stream backend. @@ -956,7 +984,11 @@ class Actor: async def _from_parent( self, parent_addr: tuple[str, int] | None, - ) -> tuple[Channel, tuple[str, int] | None]: + + ) -> tuple[ + Channel, + list[tuple[str, int]] | None, + ]: try: # Connect back to the parent actor and conduct initial # handshake. From this point on if we error, we @@ -966,11 +998,11 @@ class Actor: ) await chan.connect() + # TODO: move this into a `Channel.handshake()`? # Initial handshake: swap names. await self._do_handshake(chan) - accept_addr: tuple[str, int] | None = None - + accept_addrs: list[tuple[str, int]] | None = None if self._spawn_method == "trio": # Receive runtime state from our parent parent_data: dict[str, Any] @@ -979,10 +1011,7 @@ class Actor: "Received state from parent:\n" f"{parent_data}" ) - accept_addr = ( - parent_data.pop('bind_host'), - parent_data.pop('bind_port'), - ) + accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs') rvs = parent_data.pop('_runtime_vars') log.runtime(f"Runtime vars are: {rvs}") rvs['_is_root'] = False @@ -990,17 +1019,18 @@ class Actor: for attr, value in parent_data.items(): - if attr == '_arb_addr': + if attr == '_reg_addrs': # XXX: ``msgspec`` doesn't support serializing tuples # so just cash manually here since it's what our # internals expect. - value = tuple(value) if value else None - self._arb_addr = value + self._reg_addrs = [ + tuple(val) for val in value + ] if value else None else: setattr(self, attr, value) - return chan, accept_addr + return chan, accept_addrs except OSError: # failed to connect log.warning( @@ -1014,8 +1044,8 @@ class Actor: handler_nursery: trio.Nursery, *, # (host, port) to bind for channel server - accept_host: tuple[str, int] | None = None, - accept_port: int = 0, + listen_sockaddrs: list[tuple[str, int]] | None = None, + task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -1025,28 +1055,39 @@ class Actor: ``cancel_server()`` is called. ''' + if listen_sockaddrs is None: + listen_sockaddrs = [(None, 0)] + self._server_down = trio.Event() try: async with trio.open_nursery() as server_n: - listeners: list[trio.abc.Listener] = await server_n.start( - partial( - trio.serve_tcp, - self._stream_handler, - # new connections will stay alive even if this server - # is cancelled - handler_nursery=handler_nursery, - port=accept_port, - host=accept_host, + + for host, port in listen_sockaddrs: + listeners: list[trio.abc.Listener] = await server_n.start( + partial( + trio.serve_tcp, + + handler=self._stream_handler, + port=port, + host=host, + + # NOTE: configured such that new + # connections will stay alive even if + # this server is cancelled! + handler_nursery=handler_nursery, + ) ) - ) - sockets: list[trio.socket] = [ - getattr(listener, 'socket', 'unknown socket') - for listener in listeners - ] - log.runtime( - f'Started tcp server(s) on {sockets}') - self._listeners.extend(listeners) + sockets: list[trio.socket] = [ + getattr(listener, 'socket', 'unknown socket') + for listener in listeners + ] + log.runtime( + f'Started tcp server(s) on {sockets}' + ) + self._listeners.extend(listeners) + task_status.started(server_n) + finally: # signal the server is down since nursery above terminated self._server_down.set() @@ -1226,13 +1267,26 @@ class Actor: self._server_n.cancel_scope.cancel() @property - def accept_addr(self) -> tuple[str, int] | None: + def accept_addrs(self) -> list[tuple[str, int]]: + ''' + All addresses to which the transport-channel server binds + and listens for new connections. + + ''' + # throws OSError on failure + return [ + listener.socket.getsockname() + for listener in self._listeners + ] # type: ignore + + @property + def accept_addr(self) -> tuple[str, int]: ''' Primary address to which the channel server is bound. ''' # throws OSError on failure - return self._listeners[0].socket.getsockname() # type: ignore + return self.accept_addrs[0] def get_parent(self) -> Portal: ''' @@ -1249,6 +1303,7 @@ class Actor: ''' return self._peers[uid] + # TODO: move to `Channel.handshake(uid)` async def _do_handshake( self, chan: Channel @@ -1278,7 +1333,7 @@ class Actor: async def async_main( actor: Actor, - accept_addr: tuple[str, int] | None = None, + accept_addrs: tuple[str, int] | None = None, # XXX: currently ``parent_addr`` is only needed for the # ``multiprocessing`` backend (which pickles state sent to @@ -1303,20 +1358,25 @@ async def async_main( # on our debugger lock state. _debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) - registered_with_arbiter = False + is_registered: bool = False try: # establish primary connection with immediate parent - actor._parent_chan = None + actor._parent_chan: Channel | None = None if parent_addr is not None: - actor._parent_chan, accept_addr_rent = await actor._from_parent( - parent_addr) + ( + actor._parent_chan, + set_accept_addr_says_rent, + ) = await actor._from_parent(parent_addr) - # either it's passed in because we're not a child - # or because we're running in mp mode - if accept_addr_rent is not None: - accept_addr = accept_addr_rent + # either it's passed in because we're not a child or + # because we're running in mp mode + if ( + set_accept_addr_says_rent + and set_accept_addr_says_rent is not None + ): + accept_addrs = set_accept_addr_says_rent # load exposed/allowed RPC modules # XXX: do this **after** establishing a channel to the parent @@ -1340,38 +1400,62 @@ async def async_main( actor._service_n = service_nursery assert actor._service_n - # Startup up the channel server with, + # Startup up the transport(-channel) server with, # - subactor: the bind address is sent by our parent # over our established channel # - root actor: the ``accept_addr`` passed to this method - assert accept_addr - host, port = accept_addr + assert accept_addrs actor._server_n = await service_nursery.start( partial( actor._serve_forever, service_nursery, - accept_host=host, - accept_port=port + listen_sockaddrs=accept_addrs, ) ) - accept_addr = actor.accept_addr + accept_addrs: list[tuple[str, int]] = actor.accept_addrs + + # NOTE: only set the loopback addr for the + # process-tree-global "root" mailbox since + # all sub-actors should be able to speak to + # their root actor over that channel. if _state._runtime_vars['_is_root']: - _state._runtime_vars['_root_mailbox'] = accept_addr + for addr in accept_addrs: + host, _ = addr + # TODO: generic 'lo' detector predicate + if '127.0.0.1' in host: + _state._runtime_vars['_root_mailbox'] = addr # Register with the arbiter if we're told its addr - log.runtime(f"Registering {actor} for role `{actor.name}`") - assert isinstance(actor._arb_addr, tuple) + log.runtime( + f'Registering `{actor.name}` ->\n' + f'{pformat(accept_addrs)}' + ) - async with get_arbiter(*actor._arb_addr) as arb_portal: - await arb_portal.run_from_ns( - 'self', - 'register_actor', - uid=actor.uid, - sockaddr=accept_addr, - ) + # TODO: ideally we don't fan out to all registrars + # if addresses point to the same actor.. + # So we need a way to detect that? maybe iterate + # only on unique actor uids? + for addr in actor._reg_addrs: + assert isinstance(addr, tuple) + assert addr[1] # non-zero after bind - registered_with_arbiter = True + async with get_registry(*addr) as reg_portal: + for accept_addr in accept_addrs: + + if not accept_addr[1]: + await _debug.pause() + + assert accept_addr[1] + + await reg_portal.run_from_ns( + 'self', + 'register_actor', + uid=actor.uid, + sockaddr=accept_addr, + ) + + is_registered: bool = True # init steps complete task_status.started() @@ -1401,18 +1485,18 @@ async def async_main( log.runtime("Closing all actor lifetime contexts") actor.lifetime_stack.close() - if not registered_with_arbiter: + if not is_registered: # TODO: I guess we could try to connect back # to the parent through a channel and engage a debugger # once we have that all working with std streams locking? log.exception( f"Actor errored and failed to register with arbiter " - f"@ {actor._arb_addr}?") + f"@ {actor._reg_addrs[0]}?") log.error( "\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n" "\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n" - "\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n" - "\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n" + "\tIf this is a sub-actor likely its parent will keep running " + "\tcorrectly if this error is caught and ignored.." ) if actor._parent_chan: @@ -1447,29 +1531,35 @@ async def async_main( actor.lifetime_stack.close() - # Unregister actor from the arbiter + # Unregister actor from the registry if ( - registered_with_arbiter - and not actor.is_arbiter + is_registered + and not actor.is_registrar ): - failed = False - assert isinstance(actor._arb_addr, tuple) - with trio.move_on_after(0.5) as cs: - cs.shield = True - try: - async with get_arbiter(*actor._arb_addr) as arb_portal: - await arb_portal.run_from_ns( - 'self', - 'unregister_actor', - uid=actor.uid - ) - except OSError: + failed: bool = False + for addr in actor._reg_addrs: + assert isinstance(addr, tuple) + with trio.move_on_after(0.5) as cs: + cs.shield = True + try: + async with get_registry( + *addr, + ) as reg_portal: + await reg_portal.run_from_ns( + 'self', + 'unregister_actor', + uid=actor.uid + ) + except OSError: + failed = True + if cs.cancelled_caught: failed = True - if cs.cancelled_caught: - failed = True - if failed: - log.warning( - f"Failed to unregister {actor.name} from arbiter") + + if failed: + log.warning( + f'Failed to unregister {actor.name} from ' + f'registar @ {addr}' + ) # Ensure all peers (actors connected to us as clients) are finished if not actor._no_more_peers.is_set(): @@ -1761,18 +1851,36 @@ async def process_messages( class Arbiter(Actor): ''' - A special actor who knows all the other actors and always has - access to a top level nursery. + A special registrar actor who can contact all other actors + within its immediate process tree and possibly keeps a registry + of others meant to be discoverable in a distributed + application. Normally the registrar is also the "root actor" + and thus always has access to the top-most-level actor + (process) nursery. - The arbiter is by default the first actor spawned on each host - and is responsible for keeping track of all other actors for - coordination purposes. If a new main process is launched and an - arbiter is already running that arbiter will be used. + By default, the registrar is always initialized when and if no + other registrar socket addrs have been specified to runtime + init entry-points (such as `open_root_actor()` or + `open_nursery()`). Any time a new main process is launched (and + thus thus a new root actor created) and, no existing registrar + can be contacted at the provided `registry_addr`, then a new + one is always created; however, if one can be reached it is + used. + + Normally a distributed app requires at least registrar per + logical host where for that given "host space" (aka localhost + IPC domain of addresses) it is responsible for making all other + host (local address) bound actors *discoverable* to external + actor trees running on remote hosts. ''' is_arbiter = True - def __init__(self, *args, **kwargs) -> None: + def __init__( + self, + *args, + **kwargs, + ) -> None: self._registry: dict[ tuple[str, str], @@ -1814,7 +1922,10 @@ class Arbiter(Actor): # unpacker since we have tuples as keys (not this makes the # arbiter suscetible to hashdos): # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 - return {'.'.join(key): val for key, val in self._registry.items()} + return { + '.'.join(key): val + for key, val in self._registry.items() + } async def wait_for_actor( self, @@ -1852,8 +1963,15 @@ class Arbiter(Actor): sockaddr: tuple[str, int] ) -> None: - uid = name, _ = (str(uid[0]), str(uid[1])) - self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1])) + uid = name, hash = (str(uid[0]), str(uid[1])) + addr = (host, port) = ( + str(sockaddr[0]), + int(sockaddr[1]), + ) + if port == 0: + await _debug.pause() + assert port # should never be 0-dynamic-os-alloc + self._registry[uid] = addr # pop and signal all waiter events events = self._waiters.pop(name, []) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index db46542..985b810 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -294,7 +294,7 @@ async def new_proc( errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addr: tuple[str, int], + bind_addrs: list[tuple[str, int]], parent_addr: tuple[str, int], _runtime_vars: dict[str, Any], # serialized and sent to _child @@ -316,7 +316,7 @@ async def new_proc( actor_nursery, subactor, errors, - bind_addr, + bind_addrs, parent_addr, _runtime_vars, # run time vars infect_asyncio=infect_asyncio, @@ -331,7 +331,7 @@ async def trio_proc( errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addr: tuple[str, int], + bind_addrs: list[tuple[str, int]], parent_addr: tuple[str, int], _runtime_vars: dict[str, Any], # serialized and sent to _child *, @@ -417,12 +417,11 @@ async def trio_proc( # send additional init params await chan.send({ - "_parent_main_data": subactor._parent_main_data, - "enable_modules": subactor.enable_modules, - "_arb_addr": subactor._arb_addr, - "bind_host": bind_addr[0], - "bind_port": bind_addr[1], - "_runtime_vars": _runtime_vars, + '_parent_main_data': subactor._parent_main_data, + 'enable_modules': subactor.enable_modules, + '_reg_addrs': subactor._reg_addrs, + 'bind_addrs': bind_addrs, + '_runtime_vars': _runtime_vars, }) # track subactor in current nursery @@ -509,7 +508,7 @@ async def mp_proc( subactor: Actor, errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addr: tuple[str, int], + bind_addrs: list[tuple[str, int]], parent_addr: tuple[str, int], _runtime_vars: dict[str, Any], # serialized and sent to _child *, @@ -567,7 +566,7 @@ async def mp_proc( target=_mp_main, args=( subactor, - bind_addr, + bind_addrs, fs_info, _spawn_method, parent_addr, diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 7f77784..e8599fd 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -21,10 +21,7 @@ from contextlib import asynccontextmanager as acm from functools import partial import inspect -from typing import ( - Optional, - TYPE_CHECKING, -) +from typing import TYPE_CHECKING import typing import warnings @@ -94,7 +91,7 @@ class ActorNursery: tuple[ Actor, trio.Process | mp.Process, - Optional[Portal], + Portal | None, ] ] = {} # portals spawned with ``run_in_actor()`` are @@ -110,12 +107,12 @@ class ActorNursery: self, name: str, *, - bind_addr: tuple[str, int] = _default_bind_addr, + bind_addrs: list[tuple[str, int]] = [_default_bind_addr], rpc_module_paths: list[str] | None = None, enable_modules: list[str] | None = None, loglevel: str | None = None, # set log level per subactor nursery: trio.Nursery | None = None, - debug_mode: Optional[bool] | None = None, + debug_mode: bool | None = None, infect_asyncio: bool = False, ) -> Portal: ''' @@ -150,7 +147,9 @@ class ActorNursery: # modules allowed to invoked funcs from enable_modules=enable_modules, loglevel=loglevel, - arbiter_addr=current_actor()._arb_addr, + + # verbatim relay this actor's registrar addresses + registry_addrs=current_actor()._reg_addrs, ) parent_addr = self._actor.accept_addr assert parent_addr @@ -167,7 +166,7 @@ class ActorNursery: self, subactor, self.errors, - bind_addr, + bind_addrs, parent_addr, _rtv, # run time vars infect_asyncio=infect_asyncio, @@ -180,8 +179,8 @@ class ActorNursery: fn: typing.Callable, *, - name: Optional[str] = None, - bind_addr: tuple[str, int] = _default_bind_addr, + name: str | None = None, + bind_addrs: tuple[str, int] = [_default_bind_addr], rpc_module_paths: list[str] | None = None, enable_modules: list[str] | None = None, loglevel: str | None = None, # set log level per subactor @@ -208,7 +207,7 @@ class ActorNursery: enable_modules=[mod_path] + ( enable_modules or rpc_module_paths or [] ), - bind_addr=bind_addr, + bind_addrs=bind_addrs, loglevel=loglevel, # use the run_in_actor nursery nursery=self._ria_nursery,