diff --git a/tractor/_discovery.py b/tractor/_discovery.py
index 03775ac2..22ab88d1 100644
--- a/tractor/_discovery.py
+++ b/tractor/_discovery.py
@@ -15,16 +15,19 @@
# along with this program. If not, see .
"""
-Actor discovery API.
+Discovery (protocols) API for automatic addressing and location
+management of (service) actors.
"""
+from __future__ import annotations
from typing import (
- Optional,
- Union,
AsyncGenerator,
+ TYPE_CHECKING,
)
from contextlib import asynccontextmanager as acm
+import warnings
+from .trionics import gather_contexts
from ._ipc import _connect_chan, Channel
from ._portal import (
Portal,
@@ -34,13 +37,19 @@ from ._portal import (
from ._state import current_actor, _runtime_vars
-@acm
-async def get_arbiter(
+if TYPE_CHECKING:
+ from ._runtime import Actor
+
+@acm
+async def get_registry(
host: str,
port: int,
-) -> AsyncGenerator[Union[Portal, LocalPortal], None]:
+) -> AsyncGenerator[
+ Portal | LocalPortal | None,
+ None,
+]:
'''
Return a portal instance connected to a local or remote
arbiter.
@@ -51,16 +60,23 @@ async def get_arbiter(
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
- if actor.is_arbiter:
+ if actor.is_registrar:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
- yield LocalPortal(actor, Channel((host, port)))
+ yield LocalPortal(
+ actor,
+ Channel((host, port))
+ )
else:
- async with _connect_chan(host, port) as chan:
+ async with (
+ _connect_chan(host, port) as chan,
+ open_portal(chan) as regstr_ptl,
+ ):
+ yield regstr_ptl
- async with open_portal(chan) as arb_portal:
- yield arb_portal
+# TODO: deprecate and remove _arbiter form
+get_arbiter = get_registry
@acm
@@ -68,51 +84,81 @@ async def get_root(
**kwargs,
) -> AsyncGenerator[Portal, None]:
+ # TODO: rename mailbox to `_root_maddr` when we finally
+ # add and impl libp2p multi-addrs?
host, port = _runtime_vars['_root_mailbox']
assert host is not None
- async with _connect_chan(host, port) as chan:
- async with open_portal(chan, **kwargs) as portal:
- yield portal
+ async with (
+ _connect_chan(host, port) as chan,
+ open_portal(chan, **kwargs) as portal,
+ ):
+ yield portal
@acm
async def query_actor(
name: str,
- arbiter_sockaddr: Optional[tuple[str, int]] = None,
+ arbiter_sockaddr: tuple[str, int] | None = None,
+ regaddr: tuple[str, int] | None = None,
-) -> AsyncGenerator[tuple[str, int], None]:
+) -> AsyncGenerator[
+ tuple[str, int] | None,
+ None,
+]:
'''
- Simple address lookup for a given actor name.
+ Make a transport address lookup for an actor name to a specific
+ registrar.
- Returns the (socket) address or ``None``.
+ Returns the (socket) address or ``None`` if no entry under that
+ name exists for the given registrar listening @ `regaddr`.
'''
- actor = current_actor()
- async with get_arbiter(
- *arbiter_sockaddr or actor._arb_addr
- ) as arb_portal:
+ actor: Actor = current_actor()
+ if (
+ name == 'registrar'
+ and actor.is_registrar
+ ):
+ raise RuntimeError(
+ 'The current actor IS the registry!?'
+ )
- sockaddr = await arb_portal.run_from_ns(
+ if arbiter_sockaddr is not None:
+ warnings.warn(
+ '`tractor.query_actor(regaddr=)` is deprecated.\n'
+ 'Use `registry_addrs: list[tuple]` instead!',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ regaddr: list[tuple[str, int]] = arbiter_sockaddr
+
+ regstr: Portal
+ async with get_registry(
+ *(regaddr or actor._reg_addrs[0])
+ ) as regstr:
+
+ # TODO: return portals to all available actors - for now
+ # just the last one that registered
+ sockaddr: tuple[str, int] = await regstr.run_from_ns(
'self',
'find_actor',
name=name,
)
-
- # TODO: return portals to all available actors - for now just
- # the last one that registered
- if name == 'arbiter' and actor.is_arbiter:
- raise RuntimeError("The current actor is the arbiter")
-
- yield sockaddr if sockaddr else None
+ yield sockaddr
@acm
async def find_actor(
name: str,
- arbiter_sockaddr: tuple[str, int] | None = None
+ arbiter_sockaddr: tuple[str, int] | None = None,
+ registry_addrs: list[tuple[str, int]] | None = None,
-) -> AsyncGenerator[Optional[Portal], None]:
+ only_first: bool = True,
+
+) -> AsyncGenerator[
+ Portal | list[Portal] | None,
+ None,
+]:
'''
Ask the arbiter to find actor(s) by name.
@@ -120,24 +166,54 @@ async def find_actor(
known to the arbiter.
'''
- async with query_actor(
- name=name,
- arbiter_sockaddr=arbiter_sockaddr,
- ) as sockaddr:
+ if arbiter_sockaddr is not None:
+ warnings.warn(
+ '`tractor.find_actor(arbiter_sockaddr=)` is deprecated.\n'
+ 'Use `registry_addrs: list[tuple]` instead!',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr]
- if sockaddr:
- async with _connect_chan(*sockaddr) as chan:
- async with open_portal(chan) as portal:
- yield portal
- else:
+ @acm
+ async def maybe_open_portal_from_reg_addr(
+ addr: tuple[str, int],
+ ):
+ async with query_actor(
+ name=name,
+ regaddr=addr,
+ ) as sockaddr:
+ if sockaddr:
+ async with _connect_chan(*sockaddr) as chan:
+ async with open_portal(chan) as portal:
+ yield portal
+ else:
+ yield None
+
+ async with gather_contexts(
+ mngrs=list(
+ maybe_open_portal_from_reg_addr(addr)
+ for addr in registry_addrs
+ )
+ ) as maybe_portals:
+ print(f'Portalz: {maybe_portals}')
+ if not maybe_portals:
yield None
+ return
+
+ portals: list[Portal] = list(maybe_portals)
+ if only_first:
+ yield portals[0]
+
+ else:
+ yield portals
@acm
async def wait_for_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None,
- # registry_addr: tuple[str, int] | None = None,
+ registry_addr: tuple[str, int] | None = None,
) -> AsyncGenerator[Portal, None]:
'''
@@ -146,17 +222,33 @@ async def wait_for_actor(
A portal to the first registered actor is returned.
'''
- actor = current_actor()
+ actor: Actor = current_actor()
- async with get_arbiter(
- *arbiter_sockaddr or actor._arb_addr,
- ) as arb_portal:
- sockaddrs = await arb_portal.run_from_ns(
+ if arbiter_sockaddr is not None:
+ warnings.warn(
+ '`tractor.wait_for_actor(arbiter_sockaddr=)` is deprecated.\n'
+ 'Use `registry_addr: tuple` instead!',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ registry_addr: list[tuple[str, int]] = [
+ arbiter_sockaddr,
+ ]
+
+ # TODO: use `.trionics.gather_contexts()` like
+ # above in `find_actor()` as well?
+ async with get_registry(
+ *(registry_addr or actor._reg_addrs[0]), # first if not passed
+ ) as reg_portal:
+ sockaddrs = await reg_portal.run_from_ns(
'self',
'wait_for_actor',
name=name,
)
- sockaddr = sockaddrs[-1]
+
+ # get latest registered addr by default?
+ # TODO: offer multi-portal yields in multi-homed case?
+ sockaddr: tuple[str, int] = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
diff --git a/tractor/_entry.py b/tractor/_entry.py
index a59975ce..0ac0dc47 100644
--- a/tractor/_entry.py
+++ b/tractor/_entry.py
@@ -47,8 +47,8 @@ log = get_logger(__name__)
def _mp_main(
- actor: Actor, # type: ignore
- accept_addr: tuple[str, int],
+ actor: Actor,
+ accept_addrs: list[tuple[str, int]],
forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey,
parent_addr: tuple[str, int] | None = None,
@@ -77,8 +77,8 @@ def _mp_main(
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
async_main,
- actor,
- accept_addr,
+ actor=actor,
+ accept_addrs=accept_addrs,
parent_addr=parent_addr
)
try:
@@ -96,7 +96,7 @@ def _mp_main(
def _trio_main(
- actor: Actor, # type: ignore
+ actor: Actor,
*,
parent_addr: tuple[str, int] | None = None,
infect_asyncio: bool = False,
diff --git a/tractor/_ipc.py b/tractor/_ipc.py
index e80a1c35..f57d3bd8 100644
--- a/tractor/_ipc.py
+++ b/tractor/_ipc.py
@@ -517,7 +517,9 @@ class Channel:
@acm
async def _connect_chan(
- host: str, port: int
+ host: str,
+ port: int
+
) -> typing.AsyncGenerator[Channel, None]:
'''
Create and connect a channel with disconnect on context manager
diff --git a/tractor/_portal.py b/tractor/_portal.py
index 0ca44483..ac602dd5 100644
--- a/tractor/_portal.py
+++ b/tractor/_portal.py
@@ -461,7 +461,12 @@ class LocalPortal:
actor: 'Actor' # type: ignore # noqa
channel: Channel
- async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any:
+ async def run_from_ns(
+ self,
+ ns: str,
+ func_name: str,
+ **kwargs,
+ ) -> Any:
'''
Run a requested local function from a namespace path and
return it's result.
diff --git a/tractor/_root.py b/tractor/_root.py
index a1a11d3b..403907ec 100644
--- a/tractor/_root.py
+++ b/tractor/_root.py
@@ -59,10 +59,10 @@ async def open_root_actor(
*,
# defaults are above
- arbiter_addr: tuple[str, int] | None = None,
+ registry_addrs: list[tuple[str, int]] | None = None,
# defaults are above
- registry_addr: tuple[str, int] | None = None,
+ arbiter_addr: tuple[str, int] | None = None,
name: str | None = 'root',
@@ -116,19 +116,19 @@ async def open_root_actor(
if arbiter_addr is not None:
warnings.warn(
- '`arbiter_addr` is now deprecated and has been renamed to'
- '`registry_addr`.\nUse that instead..',
+ '`arbiter_addr` is now deprecated\n'
+ 'Use `registry_addrs: list[tuple]` instead..',
DeprecationWarning,
stacklevel=2,
)
+ registry_addrs = [arbiter_addr]
- registry_addr = (host, port) = (
- registry_addr
- or arbiter_addr
- or (
+ registry_addrs: list[tuple[str, int]] = (
+ registry_addrs
+ or [ # default on localhost
_default_arbiter_host,
_default_arbiter_port,
- )
+ ]
)
loglevel = (
@@ -177,60 +177,105 @@ async def open_root_actor(
'`stackscope` not installed for use in debug mode!'
)
- try:
- # make a temporary connection to see if an arbiter exists,
- # if one can't be made quickly we assume none exists.
- arbiter_found = False
+ # closed into below ping task-func
+ ponged_addrs: list[tuple[str, int]] = []
- # TODO: this connect-and-bail forces us to have to carefully
- # rewrap TCP 104-connection-reset errors as EOF so as to avoid
- # propagating cancel-causing errors to the channel-msg loop
- # machinery. Likely it would be better to eventually have
- # a "discovery" protocol with basic handshake instead.
- with trio.move_on_after(1):
- async with _connect_chan(host, port):
- arbiter_found = True
+ async def ping_tpt_socket(
+ addr: tuple[str, int],
+ timeout: float = 1,
+ ) -> None:
+ '''
+ Attempt temporary connection to see if a registry is
+ listening at the requested address by a tranport layer
+ ping.
- except OSError:
- # TODO: make this a "discovery" log level?
- logger.warning(f"No actor registry found @ {host}:{port}")
+ If a connection can't be made quickly we assume none no
+ server is listening at that addr.
- # create a local actor and start up its main routine/task
- if arbiter_found:
+ '''
+ try:
+ # TODO: this connect-and-bail forces us to have to
+ # carefully rewrap TCP 104-connection-reset errors as
+ # EOF so as to avoid propagating cancel-causing errors
+ # to the channel-msg loop machinery. Likely it would
+ # be better to eventually have a "discovery" protocol
+ # with basic handshake instead?
+ with trio.move_on_after(timeout):
+ async with _connect_chan(*addr):
+ ponged_addrs.append(addr)
+
+ except OSError:
+ # TODO: make this a "discovery" log level?
+ logger.warning(f'No actor registry found @ {addr}')
+
+ async with trio.open_nursery() as tn:
+ for addr in registry_addrs:
+ tn.start_soon(ping_tpt_socket, addr)
+
+ trans_bind_addrs: list[tuple[str, int]] = []
+
+ # Create a new local root-actor instance which IS NOT THE
+ # REGISTRAR
+ if ponged_addrs:
# we were able to connect to an arbiter
- logger.info(f"Arbiter seems to exist @ {host}:{port}")
+ logger.info(
+ f'Registry(s) seem(s) to exist @ {ponged_addrs}'
+ )
actor = Actor(
- name or 'anonymous',
- arbiter_addr=registry_addr,
+ name=name or 'anonymous',
+ registry_addrs=ponged_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
- host, port = (host, 0)
+ # DO NOT use the registry_addrs as the transport server
+ # addrs for this new non-registar, root-actor.
+ for host, port in ponged_addrs:
+ # NOTE: zero triggers dynamic OS port allocation
+ trans_bind_addrs.append((host, 0))
+ # Start this local actor as the "registrar", aka a regular
+ # actor who manages the local registry of "mailboxes" of
+ # other process-tree-local sub-actors.
else:
- # start this local actor as the arbiter (aka a regular actor who
- # manages the local registry of "mailboxes")
- # Note that if the current actor is the arbiter it is desirable
- # for it to stay up indefinitely until a re-election process has
- # taken place - which is not implemented yet FYI).
+ # NOTE that if the current actor IS THE REGISTAR, the
+ # following init steps are taken:
+ # - the tranport layer server is bound to each (host, port)
+ # pair defined in provided registry_addrs, or the default.
+ trans_bind_addrs = registry_addrs
+
+ # - it is normally desirable for any registrar to stay up
+ # indefinitely until either all registered (child/sub)
+ # actors are terminated (via SC supervision) or,
+ # a re-election process has taken place.
+ # NOTE: all of ^ which is not implemented yet - see:
+ # https://github.com/goodboy/tractor/issues/216
+ # https://github.com/goodboy/tractor/pull/348
+ # https://github.com/goodboy/tractor/issues/296
actor = Arbiter(
- name or 'arbiter',
- arbiter_addr=registry_addr,
+ name or 'registrar',
+ registry_addrs=registry_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
+ # Start up main task set via core actor-runtime nurseries.
try:
# assign process-local actor
_state._current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
- logger.info(f"Starting local {actor} @ {host}:{port}")
+ ml_addrs_str: str = '\n'.join(
+ f'@{addr}' for addr in trans_bind_addrs
+ )
+ logger.info(
+ f'Starting local {actor.uid} on the following transport addrs:\n'
+ f'{ml_addrs_str}'
+ )
# start the actor runtime in a new task
async with trio.open_nursery() as nursery:
@@ -243,7 +288,7 @@ async def open_root_actor(
partial(
async_main,
actor,
- accept_addr=(host, port),
+ accept_addrs=trans_bind_addrs,
parent_addr=None
)
)
@@ -255,7 +300,7 @@ async def open_root_actor(
BaseExceptionGroup,
) as err:
- entered = await _debug._maybe_enter_pm(err)
+ entered: bool = await _debug._maybe_enter_pm(err)
if (
not entered
and
@@ -263,7 +308,8 @@ async def open_root_actor(
):
logger.exception('Root actor crashed:\n')
- # always re-raise
+ # ALWAYS re-raise any error bubbled up from the
+ # runtime!
raise
finally:
@@ -284,7 +330,7 @@ async def open_root_actor(
_state._current_actor = None
_state._last_actor_terminated = actor
- # restore breakpoint hook state
+ # restore built-in `breakpoint()` hook state
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
@@ -300,10 +346,9 @@ def run_daemon(
# runtime kwargs
name: str | None = 'root',
- registry_addr: tuple[str, int] = (
- _default_arbiter_host,
- _default_arbiter_port,
- ),
+ registry_addrs: list[tuple[str, int]] = [
+ (_default_arbiter_host, _default_arbiter_port)
+ ],
start_method: str | None = None,
debug_mode: bool = False,
@@ -327,7 +372,7 @@ def run_daemon(
async def _main():
async with open_root_actor(
- registry_addr=registry_addr,
+ registry_addrs=registry_addrs,
name=name,
start_method=start_method,
debug_mode=debug_mode,
diff --git a/tractor/_runtime.py b/tractor/_runtime.py
index 6b3d9461..d2a9e405 100644
--- a/tractor/_runtime.py
+++ b/tractor/_runtime.py
@@ -45,6 +45,7 @@ from functools import partial
from itertools import chain
import importlib
import importlib.util
+import os
from pprint import pformat
import signal
import sys
@@ -55,7 +56,7 @@ from typing import (
)
import uuid
from types import ModuleType
-import os
+import warnings
import trio
from trio import (
@@ -77,8 +78,8 @@ from ._exceptions import (
ContextCancelled,
TransportClosed,
)
-from ._discovery import get_arbiter
from .devx import _debug
+from ._discovery import get_registry
from ._portal import Portal
from . import _state
from . import _mp_fixup_main
@@ -127,6 +128,11 @@ class Actor:
# ugh, we need to get rid of this and replace with a "registry" sys
# https://github.com/goodboy/tractor/issues/216
is_arbiter: bool = False
+
+ @property
+ def is_registrar(self) -> bool:
+ return self.is_arbiter
+
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork
@@ -164,8 +170,12 @@ class Actor:
enable_modules: list[str] = [],
uid: str | None = None,
loglevel: str | None = None,
+ registry_addrs: list[tuple[str, int]] | None = None,
+ spawn_method: str | None = None,
+
+ # TODO: remove!
arbiter_addr: tuple[str, int] | None = None,
- spawn_method: str | None = None
+
) -> None:
'''
This constructor is called in the parent actor **before** the spawning
@@ -189,27 +199,36 @@ class Actor:
# always include debugging tools module
enable_modules.append('tractor.devx._debug')
- mods = {}
+ self.enable_modules: dict[str, str] = {}
for name in enable_modules:
- mod = importlib.import_module(name)
- mods[name] = _get_mod_abspath(mod)
+ mod: ModuleType = importlib.import_module(name)
+ self.enable_modules[name] = _get_mod_abspath(mod)
- self.enable_modules = mods
self._mods: dict[str, ModuleType] = {}
- self.loglevel = loglevel
+ self.loglevel: str = loglevel
- self._arb_addr: tuple[str, int] | None = (
- str(arbiter_addr[0]),
- int(arbiter_addr[1])
- ) if arbiter_addr else None
+ if arbiter_addr is not None:
+ warnings.warn(
+ '`Actor(arbiter_addr=)` is now deprecated.\n'
+ 'Use `registry_addrs: list[tuple]` instead.',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ registry_addrs: list[tuple[str, int]] = [arbiter_addr]
+
+ self._reg_addrs: list[tuple[str, int]] = (
+ registry_addrs
+ or
+ None
+ )
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
# by the user (currently called the "arbiter")
- self._spawn_method = spawn_method
+ self._spawn_method: str = spawn_method
self._peers: defaultdict = defaultdict(list)
- self._peer_connected: dict = {}
+ self._peer_connected: dict[tuple[str, str], trio.Event] = {}
self._no_more_peers = trio.Event()
self._no_more_peers.set()
self._ongoing_rpc_tasks = trio.Event()
@@ -336,6 +355,12 @@ class Actor:
self._no_more_peers = trio.Event() # unset by making new
chan = Channel.from_stream(stream)
their_uid: tuple[str, str]|None = chan.uid
+ if their_uid:
+ log.warning(
+ f'Re-connection from already known {their_uid}'
+ )
+ else:
+ log.runtime(f'New connection to us @{chan.raddr}')
con_msg: str = ''
if their_uid:
@@ -880,11 +905,11 @@ class Actor:
)
await chan.connect()
+ # TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names.
await self._do_handshake(chan)
- accept_addr: tuple[str, int] | None = None
-
+ accept_addrs: list[tuple[str, int]] | None = None
if self._spawn_method == "trio":
# Receive runtime state from our parent
parent_data: dict[str, Any]
@@ -897,10 +922,7 @@ class Actor:
# if "trace"/"util" mode is enabled?
f'{pformat(parent_data)}\n'
)
- accept_addr = (
- parent_data.pop('bind_host'),
- parent_data.pop('bind_port'),
- )
+ accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
rvs = parent_data.pop('_runtime_vars')
if rvs['_debug_mode']:
@@ -919,17 +941,18 @@ class Actor:
for attr, value in parent_data.items():
- if attr == '_arb_addr':
+ if attr == '_reg_addrs':
# XXX: ``msgspec`` doesn't support serializing tuples
# so just cash manually here since it's what our
# internals expect.
- value = tuple(value) if value else None
- self._arb_addr = value
+ self._reg_addrs = [
+ tuple(val) for val in value
+ ] if value else None
else:
setattr(self, attr, value)
- return chan, accept_addr
+ return chan, accept_addrs
except OSError: # failed to connect
log.warning(
@@ -946,8 +969,8 @@ class Actor:
handler_nursery: Nursery,
*,
# (host, port) to bind for channel server
- accept_host: tuple[str, int] | None = None,
- accept_port: int = 0,
+ listen_sockaddrs: list[tuple[str, int]] | None = None,
+
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
@@ -958,30 +981,40 @@ class Actor:
`.cancel_server()` is called.
'''
+ if listen_sockaddrs is None:
+ listen_sockaddrs = [(None, 0)]
+
self._server_down = trio.Event()
try:
async with trio.open_nursery() as server_n:
- listeners: list[trio.abc.Listener] = await server_n.start(
- partial(
- trio.serve_tcp,
- self._stream_handler,
- # new connections will stay alive even if this server
- # is cancelled
- handler_nursery=handler_nursery,
- port=accept_port,
- host=accept_host,
+
+ for host, port in listen_sockaddrs:
+ listeners: list[trio.abc.Listener] = await server_n.start(
+ partial(
+ trio.serve_tcp,
+
+ handler=self._stream_handler,
+ port=port,
+ host=host,
+
+ # NOTE: configured such that new
+ # connections will stay alive even if
+ # this server is cancelled!
+ handler_nursery=handler_nursery,
+ )
)
- )
- sockets: list[trio.socket] = [
- getattr(listener, 'socket', 'unknown socket')
- for listener in listeners
- ]
- log.runtime(
- 'Started TCP server(s)\n'
- f'|_{sockets}\n'
- )
- self._listeners.extend(listeners)
+ sockets: list[trio.socket] = [
+ getattr(listener, 'socket', 'unknown socket')
+ for listener in listeners
+ ]
+ log.runtime(
+ 'Started TCP server(s)\n'
+ f'|_{sockets}\n'
+ )
+ self._listeners.extend(listeners)
+
task_status.started(server_n)
+
finally:
# signal the server is down since nursery above terminated
self._server_down.set()
@@ -1318,6 +1351,19 @@ class Actor:
log.runtime("Shutting down channel server")
self._server_n.cancel_scope.cancel()
+ @property
+ def accept_addrs(self) -> list[tuple[str, int]]:
+ '''
+ All addresses to which the transport-channel server binds
+ and listens for new connections.
+
+ '''
+ # throws OSError on failure
+ return [
+ listener.socket.getsockname()
+ for listener in self._listeners
+ ] # type: ignore
+
@property
def accept_addr(self) -> tuple[str, int]:
'''
@@ -1326,7 +1372,7 @@ class Actor:
'''
# throws OSError on failure
- return self._listeners[0].socket.getsockname() # type: ignore
+ return self.accept_addrs[0]
def get_parent(self) -> Portal:
'''
@@ -1343,6 +1389,7 @@ class Actor:
'''
return self._peers[uid]
+ # TODO: move to `Channel.handshake(uid)`
async def _do_handshake(
self,
chan: Channel
@@ -1379,7 +1426,7 @@ class Actor:
async def async_main(
actor: Actor,
- accept_addr: tuple[str, int] | None = None,
+ accept_addrs: tuple[str, int] | None = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
@@ -1407,20 +1454,25 @@ async def async_main(
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
- registered_with_arbiter = False
+ is_registered: bool = False
try:
# establish primary connection with immediate parent
- actor._parent_chan = None
+ actor._parent_chan: Channel | None = None
if parent_addr is not None:
- actor._parent_chan, accept_addr_rent = await actor._from_parent(
- parent_addr)
+ (
+ actor._parent_chan,
+ set_accept_addr_says_rent,
+ ) = await actor._from_parent(parent_addr)
- # either it's passed in because we're not a child
- # or because we're running in mp mode
- if accept_addr_rent is not None:
- accept_addr = accept_addr_rent
+ # either it's passed in because we're not a child or
+ # because we're running in mp mode
+ if (
+ set_accept_addr_says_rent
+ and set_accept_addr_says_rent is not None
+ ):
+ accept_addrs = set_accept_addr_says_rent
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
@@ -1460,34 +1512,58 @@ async def async_main(
# - subactor: the bind address is sent by our parent
# over our established channel
# - root actor: the ``accept_addr`` passed to this method
- assert accept_addr
- host, port = accept_addr
+ assert accept_addrs
actor._server_n = await service_nursery.start(
partial(
actor._serve_forever,
service_nursery,
- accept_host=host,
- accept_port=port
+ listen_sockaddrs=accept_addrs,
)
)
- accept_addr = actor.accept_addr
+ accept_addrs: list[tuple[str, int]] = actor.accept_addrs
+
+ # NOTE: only set the loopback addr for the
+ # process-tree-global "root" mailbox since
+ # all sub-actors should be able to speak to
+ # their root actor over that channel.
if _state._runtime_vars['_is_root']:
- _state._runtime_vars['_root_mailbox'] = accept_addr
+ for addr in accept_addrs:
+ host, _ = addr
+ # TODO: generic 'lo' detector predicate
+ if '127.0.0.1' in host:
+ _state._runtime_vars['_root_mailbox'] = addr
# Register with the arbiter if we're told its addr
- log.runtime(f"Registering {actor} for role `{actor.name}`")
- assert isinstance(actor._arb_addr, tuple)
+ log.runtime(
+ f'Registering `{actor.name}` ->\n'
+ f'{pformat(accept_addrs)}'
+ )
- async with get_arbiter(*actor._arb_addr) as arb_portal:
- await arb_portal.run_from_ns(
- 'self',
- 'register_actor',
- uid=actor.uid,
- sockaddr=accept_addr,
- )
+ # TODO: ideally we don't fan out to all registrars
+ # if addresses point to the same actor..
+ # So we need a way to detect that? maybe iterate
+ # only on unique actor uids?
+ for addr in actor._reg_addrs:
+ assert isinstance(addr, tuple)
+ assert addr[1] # non-zero after bind
- registered_with_arbiter = True
+ async with get_registry(*addr) as reg_portal:
+ for accept_addr in accept_addrs:
+
+ if not accept_addr[1]:
+ await _debug.pause()
+
+ assert accept_addr[1]
+
+ await reg_portal.run_from_ns(
+ 'self',
+ 'register_actor',
+ uid=actor.uid,
+ sockaddr=accept_addr,
+ )
+
+ is_registered: bool = True
# init steps complete
task_status.started()
@@ -1520,18 +1596,18 @@ async def async_main(
log.runtime("Closing all actor lifetime contexts")
actor.lifetime_stack.close()
- if not registered_with_arbiter:
+ if not is_registered:
# TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger
# once we have that all working with std streams locking?
log.exception(
f"Actor errored and failed to register with arbiter "
- f"@ {actor._arb_addr}?")
+ f"@ {actor._reg_addrs[0]}?")
log.error(
"\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n"
"\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n"
- "\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n"
- "\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n"
+ "\tIf this is a sub-actor likely its parent will keep running "
+ "\tcorrectly if this error is caught and ignored.."
)
if actor._parent_chan:
@@ -1571,27 +1647,33 @@ async def async_main(
# Unregister actor from the registry-sys / registrar.
if (
- registered_with_arbiter
- and not actor.is_arbiter
+ is_registered
+ and not actor.is_registrar
):
- failed = False
- assert isinstance(actor._arb_addr, tuple)
- with trio.move_on_after(0.5) as cs:
- cs.shield = True
- try:
- async with get_arbiter(*actor._arb_addr) as arb_portal:
- await arb_portal.run_from_ns(
- 'self',
- 'unregister_actor',
- uid=actor.uid
- )
- except OSError:
+ failed: bool = False
+ for addr in actor._reg_addrs:
+ assert isinstance(addr, tuple)
+ with trio.move_on_after(0.5) as cs:
+ cs.shield = True
+ try:
+ async with get_registry(
+ *addr,
+ ) as reg_portal:
+ await reg_portal.run_from_ns(
+ 'self',
+ 'unregister_actor',
+ uid=actor.uid
+ )
+ except OSError:
+ failed = True
+ if cs.cancelled_caught:
failed = True
- if cs.cancelled_caught:
- failed = True
- if failed:
- log.warning(
- f"Failed to unregister {actor.name} from arbiter")
+
+ if failed:
+ log.warning(
+ f'Failed to unregister {actor.name} from '
+ f'registar @ {addr}'
+ )
# Ensure all peers (actors connected to us as clients) are finished
if not actor._no_more_peers.is_set():
@@ -1610,18 +1692,36 @@ async def async_main(
# TODO: rename to `Registry` and move to `._discovery`!
class Arbiter(Actor):
'''
- A special actor who knows all the other actors and always has
- access to a top level nursery.
+ A special registrar actor who can contact all other actors
+ within its immediate process tree and possibly keeps a registry
+ of others meant to be discoverable in a distributed
+ application. Normally the registrar is also the "root actor"
+ and thus always has access to the top-most-level actor
+ (process) nursery.
- The arbiter is by default the first actor spawned on each host
- and is responsible for keeping track of all other actors for
- coordination purposes. If a new main process is launched and an
- arbiter is already running that arbiter will be used.
+ By default, the registrar is always initialized when and if no
+ other registrar socket addrs have been specified to runtime
+ init entry-points (such as `open_root_actor()` or
+ `open_nursery()`). Any time a new main process is launched (and
+ thus thus a new root actor created) and, no existing registrar
+ can be contacted at the provided `registry_addr`, then a new
+ one is always created; however, if one can be reached it is
+ used.
+
+ Normally a distributed app requires at least registrar per
+ logical host where for that given "host space" (aka localhost
+ IPC domain of addresses) it is responsible for making all other
+ host (local address) bound actors *discoverable* to external
+ actor trees running on remote hosts.
'''
is_arbiter = True
- def __init__(self, *args, **kwargs) -> None:
+ def __init__(
+ self,
+ *args,
+ **kwargs,
+ ) -> None:
self._registry: dict[
tuple[str, str],
@@ -1663,7 +1763,10 @@ class Arbiter(Actor):
# unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
- return {'.'.join(key): val for key, val in self._registry.items()}
+ return {
+ '.'.join(key): val
+ for key, val in self._registry.items()
+ }
async def wait_for_actor(
self,
@@ -1706,8 +1809,15 @@ class Arbiter(Actor):
sockaddr: tuple[str, int]
) -> None:
- uid = name, _ = (str(uid[0]), str(uid[1]))
- self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
+ uid = name, hash = (str(uid[0]), str(uid[1]))
+ addr = (host, port) = (
+ str(sockaddr[0]),
+ int(sockaddr[1]),
+ )
+ if port == 0:
+ await _debug.pause()
+ assert port # should never be 0-dynamic-os-alloc
+ self._registry[uid] = addr
# pop and signal all waiter events
events = self._waiters.pop(name, [])
diff --git a/tractor/_spawn.py b/tractor/_spawn.py
index 78c38c84..001a0f10 100644
--- a/tractor/_spawn.py
+++ b/tractor/_spawn.py
@@ -365,7 +365,7 @@ async def new_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
- bind_addr: tuple[str, int],
+ bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
@@ -387,7 +387,7 @@ async def new_proc(
actor_nursery,
subactor,
errors,
- bind_addr,
+ bind_addrs,
parent_addr,
_runtime_vars, # run time vars
infect_asyncio=infect_asyncio,
@@ -402,7 +402,7 @@ async def trio_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
- bind_addr: tuple[str, int],
+ bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@@ -491,12 +491,11 @@ async def trio_proc(
# send additional init params
await chan.send({
- "_parent_main_data": subactor._parent_main_data,
- "enable_modules": subactor.enable_modules,
- "_arb_addr": subactor._arb_addr,
- "bind_host": bind_addr[0],
- "bind_port": bind_addr[1],
- "_runtime_vars": _runtime_vars,
+ '_parent_main_data': subactor._parent_main_data,
+ 'enable_modules': subactor.enable_modules,
+ '_reg_addrs': subactor._reg_addrs,
+ 'bind_addrs': bind_addrs,
+ '_runtime_vars': _runtime_vars,
})
# track subactor in current nursery
@@ -602,7 +601,7 @@ async def mp_proc(
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
- bind_addr: tuple[str, int],
+ bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@@ -660,7 +659,7 @@ async def mp_proc(
target=_mp_main,
args=(
subactor,
- bind_addr,
+ bind_addrs,
fs_info,
_spawn_method,
parent_addr,
diff --git a/tractor/_supervise.py b/tractor/_supervise.py
index c8c2336d..615ba692 100644
--- a/tractor/_supervise.py
+++ b/tractor/_supervise.py
@@ -22,10 +22,7 @@ from contextlib import asynccontextmanager as acm
from functools import partial
import inspect
from pprint import pformat
-from typing import (
- Optional,
- TYPE_CHECKING,
-)
+from typing import TYPE_CHECKING
import typing
import warnings
@@ -97,7 +94,7 @@ class ActorNursery:
tuple[
Actor,
trio.Process | mp.Process,
- Optional[Portal],
+ Portal | None,
]
] = {}
# portals spawned with ``run_in_actor()`` are
@@ -121,12 +118,12 @@ class ActorNursery:
self,
name: str,
*,
- bind_addr: tuple[str, int] = _default_bind_addr,
+ bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
nursery: trio.Nursery | None = None,
- debug_mode: Optional[bool] | None = None,
+ debug_mode: bool | None = None,
infect_asyncio: bool = False,
) -> Portal:
'''
@@ -161,7 +158,9 @@ class ActorNursery:
# modules allowed to invoked funcs from
enable_modules=enable_modules,
loglevel=loglevel,
- arbiter_addr=current_actor()._arb_addr,
+
+ # verbatim relay this actor's registrar addresses
+ registry_addrs=current_actor()._reg_addrs,
)
parent_addr = self._actor.accept_addr
assert parent_addr
@@ -178,7 +177,7 @@ class ActorNursery:
self,
subactor,
self.errors,
- bind_addr,
+ bind_addrs,
parent_addr,
_rtv, # run time vars
infect_asyncio=infect_asyncio,
@@ -191,8 +190,8 @@ class ActorNursery:
fn: typing.Callable,
*,
- name: Optional[str] = None,
- bind_addr: tuple[str, int] = _default_bind_addr,
+ name: str | None = None,
+ bind_addrs: tuple[str, int] = [_default_bind_addr],
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
@@ -221,7 +220,7 @@ class ActorNursery:
enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or []
),
- bind_addr=bind_addr,
+ bind_addrs=bind_addrs,
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._ria_nursery,