Init-support for "multi homed" transports

Since we'd like to eventually allow a diverse set of transport
(protocol) methods and stacks, and a multi-peer discovery system for
distributed actor-tree applications, this reworks all runtime internals
to support multi-homing for any given tree on a logical host. In other
words any actor can now bind its transport server (currently only
unsecured TCP + `msgspec`) to more then one address available in its
(linux) network namespace. Further, registry actors (now dubbed
"registars" instead of "arbiters") can also similarly bind to multiple
network addresses and provide discovery services to remote actors via
multiple addresses which can now be provided at runtime startup.

Deats:
- adjust `._runtime` internals to use a `list[tuple[str, int]]` (and
  thus pluralized) socket address sequence where applicable for transport
  server socket binds, now exposed via `Actor.accept_addrs`:
  - `Actor.__init__()` now takes a `registry_addrs: list`.
  - `Actor.is_arbiter` -> `.is_registrar`.
  - `._arb_addr` -> `._reg_addrs: list[tuple]`.
  - always reg and de-reg from all registrars in `async_main()`.
  - only set the global runtime var `'_root_mailbox'` to the loopback
    address since normally all in-tree processes should have access to
    it, right?
  - `._serve_forever()` task now takes `listen_sockaddrs: list[tuple]`
- make `open_root_actor()` take a `registry_addrs: list[tuple[str, int]]`
  and defaults when not passed.
- change `ActorNursery.start_..()` methods take `bind_addrs: list` and
  pass down through the spawning layer(s) via the parent-seed-msg.
- generalize all `._discovery()` APIs to accept `registry_addrs`-like
  inputs and move all relevant subsystems to adopt the "registry" style
  naming instead of "arbiter":
  - make `find_actor()` support batched concurrent portal queries over
    all provided input addresses using `.trionics.gather_contexts()` Bo
  - syntax: move to using `async with <tuples>` 3.9+ style chained
    @acms.
  - a general modernization of the code to a python 3.9+ style.
  - start deprecation and change to "registry" naming / semantics:
    - `._discovery.get_arbiter()` -> `.get_registry()`
multihomed
Tyler Goodlet 2023-09-27 15:19:30 -04:00
parent ee151b00af
commit 3d0e95513c
8 changed files with 504 additions and 240 deletions

View File

@ -15,16 +15,19 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Actor discovery API.
Discovery (protocols) API for automatic addressing and location
management of (service) actors.
"""
from __future__ import annotations
from typing import (
Optional,
Union,
AsyncGenerator,
TYPE_CHECKING,
)
from contextlib import asynccontextmanager as acm
import warnings
from .trionics import gather_contexts
from ._ipc import _connect_chan, Channel
from ._portal import (
Portal,
@ -34,13 +37,19 @@ from ._portal import (
from ._state import current_actor, _runtime_vars
@acm
async def get_arbiter(
if TYPE_CHECKING:
from ._runtime import Actor
@acm
async def get_registry(
host: str,
port: int,
) -> AsyncGenerator[Union[Portal, LocalPortal], None]:
) -> AsyncGenerator[
Portal | LocalPortal | None,
None,
]:
'''
Return a portal instance connected to a local or remote
arbiter.
@ -51,16 +60,23 @@ async def get_arbiter(
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_arbiter:
if actor.is_registrar:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor, Channel((host, port)))
yield LocalPortal(
actor,
Channel((host, port))
)
else:
async with _connect_chan(host, port) as chan:
async with (
_connect_chan(host, port) as chan,
open_portal(chan) as regstr_ptl,
):
yield regstr_ptl
async with open_portal(chan) as arb_portal:
yield arb_portal
# TODO: deprecate and remove _arbiter form
get_arbiter = get_registry
@acm
@ -68,51 +84,81 @@ async def get_root(
**kwargs,
) -> AsyncGenerator[Portal, None]:
# TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs?
host, port = _runtime_vars['_root_mailbox']
assert host is not None
async with _connect_chan(host, port) as chan:
async with open_portal(chan, **kwargs) as portal:
yield portal
async with (
_connect_chan(host, port) as chan,
open_portal(chan, **kwargs) as portal,
):
yield portal
@acm
async def query_actor(
name: str,
arbiter_sockaddr: Optional[tuple[str, int]] = None,
arbiter_sockaddr: tuple[str, int] | None = None,
regaddr: tuple[str, int] | None = None,
) -> AsyncGenerator[tuple[str, int], None]:
) -> AsyncGenerator[
tuple[str, int] | None,
None,
]:
'''
Simple address lookup for a given actor name.
Make a transport address lookup for an actor name to a specific
registrar.
Returns the (socket) address or ``None``.
Returns the (socket) address or ``None`` if no entry under that
name exists for the given registrar listening @ `regaddr`.
'''
actor = current_actor()
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr
) as arb_portal:
actor: Actor = current_actor()
if (
name == 'registrar'
and actor.is_registrar
):
raise RuntimeError(
'The current actor IS the registry!?'
)
sockaddr = await arb_portal.run_from_ns(
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.query_actor(regaddr=<blah>)` is deprecated.\n'
'Use `registry_addrs: list[tuple]` instead!',
DeprecationWarning,
stacklevel=2,
)
regaddr: list[tuple[str, int]] = arbiter_sockaddr
regstr: Portal
async with get_registry(
*(regaddr or actor._reg_addrs[0])
) as regstr:
# TODO: return portals to all available actors - for now
# just the last one that registered
sockaddr: tuple[str, int] = await regstr.run_from_ns(
'self',
'find_actor',
name=name,
)
# TODO: return portals to all available actors - for now just
# the last one that registered
if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter")
yield sockaddr if sockaddr else None
yield sockaddr
@acm
async def find_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None
arbiter_sockaddr: tuple[str, int] | None = None,
registry_addrs: list[tuple[str, int]] | None = None,
) -> AsyncGenerator[Optional[Portal], None]:
only_first: bool = True,
) -> AsyncGenerator[
Portal | list[Portal] | None,
None,
]:
'''
Ask the arbiter to find actor(s) by name.
@ -120,24 +166,54 @@ async def find_actor(
known to the arbiter.
'''
async with query_actor(
name=name,
arbiter_sockaddr=arbiter_sockaddr,
) as sockaddr:
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.find_actor(arbiter_sockaddr=<blah>)` is deprecated.\n'
'Use `registry_addrs: list[tuple]` instead!',
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr]
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
@acm
async def maybe_open_portal_from_reg_addr(
addr: tuple[str, int],
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
async with gather_contexts(
mngrs=list(
maybe_open_portal_from_reg_addr(addr)
for addr in registry_addrs
)
) as maybe_portals:
print(f'Portalz: {maybe_portals}')
if not maybe_portals:
yield None
return
portals: list[Portal] = list(maybe_portals)
if only_first:
yield portals[0]
else:
yield portals
@acm
async def wait_for_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None,
# registry_addr: tuple[str, int] | None = None,
registry_addr: tuple[str, int] | None = None,
) -> AsyncGenerator[Portal, None]:
'''
@ -146,17 +222,33 @@ async def wait_for_actor(
A portal to the first registered actor is returned.
'''
actor = current_actor()
actor: Actor = current_actor()
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr,
) as arb_portal:
sockaddrs = await arb_portal.run_from_ns(
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.wait_for_actor(arbiter_sockaddr=<foo>)` is deprecated.\n'
'Use `registry_addr: tuple` instead!',
DeprecationWarning,
stacklevel=2,
)
registry_addr: list[tuple[str, int]] = [
arbiter_sockaddr,
]
# TODO: use `.trionics.gather_contexts()` like
# above in `find_actor()` as well?
async with get_registry(
*(registry_addr or actor._reg_addrs[0]), # first if not passed
) as reg_portal:
sockaddrs = await reg_portal.run_from_ns(
'self',
'wait_for_actor',
name=name,
)
sockaddr = sockaddrs[-1]
# get latest registered addr by default?
# TODO: offer multi-portal yields in multi-homed case?
sockaddr: tuple[str, int] = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:

View File

@ -47,8 +47,8 @@ log = get_logger(__name__)
def _mp_main(
actor: Actor, # type: ignore
accept_addr: tuple[str, int],
actor: Actor,
accept_addrs: list[tuple[str, int]],
forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey,
parent_addr: tuple[str, int] | None = None,
@ -77,8 +77,8 @@ def _mp_main(
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
async_main,
actor,
accept_addr,
actor=actor,
accept_addrs=accept_addrs,
parent_addr=parent_addr
)
try:
@ -96,7 +96,7 @@ def _mp_main(
def _trio_main(
actor: Actor, # type: ignore
actor: Actor,
*,
parent_addr: tuple[str, int] | None = None,
infect_asyncio: bool = False,
@ -132,7 +132,9 @@ def _trio_main(
else:
trio.run(trio_main)
except KeyboardInterrupt:
log.cancel(f"Actor {actor.uid} received KBI")
log.cancel(
f'Actor@{actor.uid} received KBI'
)
finally:
log.info(f"Actor {actor.uid} terminated")

View File

@ -467,7 +467,9 @@ class Channel:
@asynccontextmanager
async def _connect_chan(
host: str, port: int
host: str,
port: int
) -> typing.AsyncGenerator[Channel, None]:
'''
Create and connect a channel with disconnect on context manager

View File

@ -586,7 +586,12 @@ class LocalPortal:
actor: 'Actor' # type: ignore # noqa
channel: Channel
async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any:
async def run_from_ns(
self,
ns: str,
func_name: str,
**kwargs,
) -> Any:
'''
Run a requested local function from a namespace path and
return it's result.

View File

@ -58,10 +58,10 @@ async def open_root_actor(
*,
# defaults are above
arbiter_addr: tuple[str, int] | None = None,
registry_addrs: list[tuple[str, int]] | None = None,
# defaults are above
registry_addr: tuple[str, int] | None = None,
arbiter_addr: tuple[str, int] | None = None,
name: str | None = 'root',
@ -115,19 +115,19 @@ async def open_root_actor(
if arbiter_addr is not None:
warnings.warn(
'`arbiter_addr` is now deprecated and has been renamed to'
'`registry_addr`.\nUse that instead..',
'`arbiter_addr` is now deprecated\n'
'Use `registry_addrs: list[tuple]` instead..',
DeprecationWarning,
stacklevel=2,
)
registry_addrs = [arbiter_addr]
registry_addr = (host, port) = (
registry_addr
or arbiter_addr
or (
registry_addrs: list[tuple[str, int]] = (
registry_addrs
or [ # default on localhost
_default_arbiter_host,
_default_arbiter_port,
)
]
)
loglevel = (loglevel or log._default_loglevel).upper()
@ -157,60 +157,105 @@ async def open_root_actor(
log.get_console_log(loglevel)
try:
# make a temporary connection to see if an arbiter exists,
# if one can't be made quickly we assume none exists.
arbiter_found = False
# closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = []
# TODO: this connect-and-bail forces us to have to carefully
# rewrap TCP 104-connection-reset errors as EOF so as to avoid
# propagating cancel-causing errors to the channel-msg loop
# machinery. Likely it would be better to eventually have
# a "discovery" protocol with basic handshake instead.
with trio.move_on_after(1):
async with _connect_chan(host, port):
arbiter_found = True
async def ping_tpt_socket(
addr: tuple[str, int],
timeout: float = 1,
) -> None:
'''
Attempt temporary connection to see if a registry is
listening at the requested address by a tranport layer
ping.
except OSError:
# TODO: make this a "discovery" log level?
logger.warning(f"No actor registry found @ {host}:{port}")
If a connection can't be made quickly we assume none no
server is listening at that addr.
# create a local actor and start up its main routine/task
if arbiter_found:
'''
try:
# TODO: this connect-and-bail forces us to have to
# carefully rewrap TCP 104-connection-reset errors as
# EOF so as to avoid propagating cancel-causing errors
# to the channel-msg loop machinery. Likely it would
# be better to eventually have a "discovery" protocol
# with basic handshake instead?
with trio.move_on_after(timeout):
async with _connect_chan(*addr):
ponged_addrs.append(addr)
except OSError:
# TODO: make this a "discovery" log level?
logger.warning(f'No actor registry found @ {addr}')
async with trio.open_nursery() as tn:
for addr in registry_addrs:
tn.start_soon(ping_tpt_socket, addr)
trans_bind_addrs: list[tuple[str, int]] = []
# Create a new local root-actor instance which IS NOT THE
# REGISTRAR
if ponged_addrs:
# we were able to connect to an arbiter
logger.info(f"Arbiter seems to exist @ {host}:{port}")
logger.info(
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
)
actor = Actor(
name or 'anonymous',
arbiter_addr=registry_addr,
name=name or 'anonymous',
registry_addrs=ponged_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
host, port = (host, 0)
# DO NOT use the registry_addrs as the transport server
# addrs for this new non-registar, root-actor.
for host, port in ponged_addrs:
# NOTE: zero triggers dynamic OS port allocation
trans_bind_addrs.append((host, 0))
# Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors.
else:
# start this local actor as the arbiter (aka a regular actor who
# manages the local registry of "mailboxes")
# Note that if the current actor is the arbiter it is desirable
# for it to stay up indefinitely until a re-election process has
# taken place - which is not implemented yet FYI).
# NOTE that if the current actor IS THE REGISTAR, the
# following init steps are taken:
# - the tranport layer server is bound to each (host, port)
# pair defined in provided registry_addrs, or the default.
trans_bind_addrs = registry_addrs
# - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub)
# actors are terminated (via SC supervision) or,
# a re-election process has taken place.
# NOTE: all of ^ which is not implemented yet - see:
# https://github.com/goodboy/tractor/issues/216
# https://github.com/goodboy/tractor/pull/348
# https://github.com/goodboy/tractor/issues/296
actor = Arbiter(
name or 'arbiter',
arbiter_addr=registry_addr,
name or 'registrar',
registry_addrs=registry_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
# Start up main task set via core actor-runtime nurseries.
try:
# assign process-local actor
_state._current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
logger.info(f"Starting local {actor} @ {host}:{port}")
ml_addrs_str: str = '\n'.join(
f'@{addr}' for addr in trans_bind_addrs
)
logger.info(
f'Starting local {actor.uid} on the following transport addrs:\n'
f'{ml_addrs_str}'
)
# start the actor runtime in a new task
async with trio.open_nursery() as nursery:
@ -223,7 +268,7 @@ async def open_root_actor(
partial(
async_main,
actor,
accept_addr=(host, port),
accept_addrs=trans_bind_addrs,
parent_addr=None
)
)
@ -235,13 +280,16 @@ async def open_root_actor(
BaseExceptionGroup,
) as err:
entered: bool = await _debug._maybe_enter_pm(err)
if (
not (await _debug._maybe_enter_pm(err))
not entered
and not is_multi_cancelled(err)
):
logger.exception("Root actor crashed:")
# always re-raise
# ALWAYS re-raise any error bubbled up from the
# runtime!
raise
finally:
@ -261,7 +309,7 @@ async def open_root_actor(
finally:
_state._current_actor = None
# restore breakpoint hook state
# restore built-in `breakpoint()` hook state
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
@ -277,10 +325,9 @@ def run_daemon(
# runtime kwargs
name: str | None = 'root',
registry_addr: tuple[str, int] = (
_default_arbiter_host,
_default_arbiter_port,
),
registry_addrs: list[tuple[str, int]] = [
(_default_arbiter_host, _default_arbiter_port)
],
start_method: str | None = None,
debug_mode: bool = False,
@ -304,7 +351,7 @@ def run_daemon(
async def _main():
async with open_root_actor(
registry_addr=registry_addr,
registry_addrs=registry_addrs,
name=name,
start_method=start_method,
debug_mode=debug_mode,

View File

@ -25,12 +25,12 @@ from itertools import chain
import importlib
import importlib.util
import inspect
from pprint import pformat
import signal
import sys
from typing import (
Any,
Callable,
Union,
TYPE_CHECKING,
)
import uuid
@ -59,7 +59,7 @@ from ._exceptions import (
TransportClosed,
)
from . import _debug
from ._discovery import get_arbiter
from ._discovery import get_registry
from ._portal import Portal
from . import _state
from . import _mp_fixup_main
@ -82,7 +82,7 @@ async def _invoke(
is_rpc: bool = True,
task_status: TaskStatus[
Union[Context, BaseException]
Context | BaseException
] = trio.TASK_STATUS_IGNORED,
):
'''
@ -96,8 +96,14 @@ async def _invoke(
failed_resp: bool = False
if _state.debug_mode():
import greenback
await greenback.ensure_portal()
try:
import greenback
await greenback.ensure_portal()
except ModuleNotFoundError:
log.warning(
'`greenback` is not installed.\n'
'No sync debug support!'
)
# possibly a traceback (not sure what typing is for this..)
tb = None
@ -416,13 +422,13 @@ async def _invoke(
actor._ongoing_rpc_tasks.set()
def _get_mod_abspath(module):
def _get_mod_abspath(module: ModuleType) -> str:
return os.path.abspath(module.__file__)
async def try_ship_error_to_parent(
channel: Channel,
err: Union[Exception, BaseExceptionGroup],
err: Exception | BaseExceptionGroup,
) -> None:
with trio.CancelScope(shield=True):
@ -469,6 +475,11 @@ class Actor:
# ugh, we need to get rid of this and replace with a "registry" sys
# https://github.com/goodboy/tractor/issues/216
is_arbiter: bool = False
@property
def is_registrar(self) -> bool:
return self.is_arbiter
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork
@ -501,8 +512,12 @@ class Actor:
enable_modules: list[str] = [],
uid: str | None = None,
loglevel: str | None = None,
registry_addrs: list[tuple[str, int]] | None = None,
spawn_method: str | None = None,
# TODO: remove!
arbiter_addr: tuple[str, int] | None = None,
spawn_method: str | None = None
) -> None:
'''
This constructor is called in the parent actor **before** the spawning
@ -523,27 +538,36 @@ class Actor:
# always include debugging tools module
enable_modules.append('tractor._debug')
mods = {}
self.enable_modules: dict[str, str] = {}
for name in enable_modules:
mod = importlib.import_module(name)
mods[name] = _get_mod_abspath(mod)
mod: ModuleType = importlib.import_module(name)
self.enable_modules[name] = _get_mod_abspath(mod)
self.enable_modules = mods
self._mods: dict[str, ModuleType] = {}
self.loglevel = loglevel
self.loglevel: str = loglevel
self._arb_addr: tuple[str, int] | None = (
str(arbiter_addr[0]),
int(arbiter_addr[1])
) if arbiter_addr else None
if arbiter_addr is not None:
warnings.warn(
'`Actor(arbiter_addr=<blah>)` is now deprecated.\n'
'Use `registry_addrs: list[tuple]` instead.',
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[tuple[str, int]] = [arbiter_addr]
self._reg_addrs: list[tuple[str, int]] = (
registry_addrs
or
None
)
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
# by the user (currently called the "arbiter")
self._spawn_method = spawn_method
self._spawn_method: str = spawn_method
self._peers: defaultdict = defaultdict(list)
self._peer_connected: dict = {}
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
self._no_more_peers = trio.Event()
self._no_more_peers.set()
self._ongoing_rpc_tasks = trio.Event()
@ -654,13 +678,17 @@ class Actor:
self._no_more_peers = trio.Event() # unset
chan = Channel.from_stream(stream)
uid: tuple[str, str] | None = chan.uid
log.runtime(f"New connection to us {chan}")
their_uid: tuple[str, str] | None = chan.uid
if their_uid:
log.warning(
f'Re-connection from already known {their_uid}'
)
else:
log.runtime(f'New connection to us @{chan.raddr}')
# send/receive initial handshake response
try:
uid = await self._do_handshake(chan)
except (
# we need this for ``msgspec`` for some reason?
# for now, it's been put in the stream backend.
@ -956,7 +984,11 @@ class Actor:
async def _from_parent(
self,
parent_addr: tuple[str, int] | None,
) -> tuple[Channel, tuple[str, int] | None]:
) -> tuple[
Channel,
list[tuple[str, int]] | None,
]:
try:
# Connect back to the parent actor and conduct initial
# handshake. From this point on if we error, we
@ -966,11 +998,11 @@ class Actor:
)
await chan.connect()
# TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names.
await self._do_handshake(chan)
accept_addr: tuple[str, int] | None = None
accept_addrs: list[tuple[str, int]] | None = None
if self._spawn_method == "trio":
# Receive runtime state from our parent
parent_data: dict[str, Any]
@ -979,10 +1011,7 @@ class Actor:
"Received state from parent:\n"
f"{parent_data}"
)
accept_addr = (
parent_data.pop('bind_host'),
parent_data.pop('bind_port'),
)
accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
rvs = parent_data.pop('_runtime_vars')
log.runtime(f"Runtime vars are: {rvs}")
rvs['_is_root'] = False
@ -990,17 +1019,18 @@ class Actor:
for attr, value in parent_data.items():
if attr == '_arb_addr':
if attr == '_reg_addrs':
# XXX: ``msgspec`` doesn't support serializing tuples
# so just cash manually here since it's what our
# internals expect.
value = tuple(value) if value else None
self._arb_addr = value
self._reg_addrs = [
tuple(val) for val in value
] if value else None
else:
setattr(self, attr, value)
return chan, accept_addr
return chan, accept_addrs
except OSError: # failed to connect
log.warning(
@ -1014,8 +1044,8 @@ class Actor:
handler_nursery: trio.Nursery,
*,
# (host, port) to bind for channel server
accept_host: tuple[str, int] | None = None,
accept_port: int = 0,
listen_sockaddrs: list[tuple[str, int]] | None = None,
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
@ -1025,28 +1055,39 @@ class Actor:
``cancel_server()`` is called.
'''
if listen_sockaddrs is None:
listen_sockaddrs = [(None, 0)]
self._server_down = trio.Event()
try:
async with trio.open_nursery() as server_n:
listeners: list[trio.abc.Listener] = await server_n.start(
partial(
trio.serve_tcp,
self._stream_handler,
# new connections will stay alive even if this server
# is cancelled
handler_nursery=handler_nursery,
port=accept_port,
host=accept_host,
for host, port in listen_sockaddrs:
listeners: list[trio.abc.Listener] = await server_n.start(
partial(
trio.serve_tcp,
handler=self._stream_handler,
port=port,
host=host,
# NOTE: configured such that new
# connections will stay alive even if
# this server is cancelled!
handler_nursery=handler_nursery,
)
)
)
sockets: list[trio.socket] = [
getattr(listener, 'socket', 'unknown socket')
for listener in listeners
]
log.runtime(
f'Started tcp server(s) on {sockets}')
self._listeners.extend(listeners)
sockets: list[trio.socket] = [
getattr(listener, 'socket', 'unknown socket')
for listener in listeners
]
log.runtime(
f'Started tcp server(s) on {sockets}'
)
self._listeners.extend(listeners)
task_status.started(server_n)
finally:
# signal the server is down since nursery above terminated
self._server_down.set()
@ -1226,13 +1267,26 @@ class Actor:
self._server_n.cancel_scope.cancel()
@property
def accept_addr(self) -> tuple[str, int] | None:
def accept_addrs(self) -> list[tuple[str, int]]:
'''
All addresses to which the transport-channel server binds
and listens for new connections.
'''
# throws OSError on failure
return [
listener.socket.getsockname()
for listener in self._listeners
] # type: ignore
@property
def accept_addr(self) -> tuple[str, int]:
'''
Primary address to which the channel server is bound.
'''
# throws OSError on failure
return self._listeners[0].socket.getsockname() # type: ignore
return self.accept_addrs[0]
def get_parent(self) -> Portal:
'''
@ -1249,6 +1303,7 @@ class Actor:
'''
return self._peers[uid]
# TODO: move to `Channel.handshake(uid)`
async def _do_handshake(
self,
chan: Channel
@ -1278,7 +1333,7 @@ class Actor:
async def async_main(
actor: Actor,
accept_addr: tuple[str, int] | None = None,
accept_addrs: tuple[str, int] | None = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
@ -1303,20 +1358,25 @@ async def async_main(
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
registered_with_arbiter = False
is_registered: bool = False
try:
# establish primary connection with immediate parent
actor._parent_chan = None
actor._parent_chan: Channel | None = None
if parent_addr is not None:
actor._parent_chan, accept_addr_rent = await actor._from_parent(
parent_addr)
(
actor._parent_chan,
set_accept_addr_says_rent,
) = await actor._from_parent(parent_addr)
# either it's passed in because we're not a child
# or because we're running in mp mode
if accept_addr_rent is not None:
accept_addr = accept_addr_rent
# either it's passed in because we're not a child or
# because we're running in mp mode
if (
set_accept_addr_says_rent
and set_accept_addr_says_rent is not None
):
accept_addrs = set_accept_addr_says_rent
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
@ -1340,38 +1400,62 @@ async def async_main(
actor._service_n = service_nursery
assert actor._service_n
# Startup up the channel server with,
# Startup up the transport(-channel) server with,
# - subactor: the bind address is sent by our parent
# over our established channel
# - root actor: the ``accept_addr`` passed to this method
assert accept_addr
host, port = accept_addr
assert accept_addrs
actor._server_n = await service_nursery.start(
partial(
actor._serve_forever,
service_nursery,
accept_host=host,
accept_port=port
listen_sockaddrs=accept_addrs,
)
)
accept_addr = actor.accept_addr
accept_addrs: list[tuple[str, int]] = actor.accept_addrs
# NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since
# all sub-actors should be able to speak to
# their root actor over that channel.
if _state._runtime_vars['_is_root']:
_state._runtime_vars['_root_mailbox'] = accept_addr
for addr in accept_addrs:
host, _ = addr
# TODO: generic 'lo' detector predicate
if '127.0.0.1' in host:
_state._runtime_vars['_root_mailbox'] = addr
# Register with the arbiter if we're told its addr
log.runtime(f"Registering {actor} for role `{actor.name}`")
assert isinstance(actor._arb_addr, tuple)
log.runtime(
f'Registering `{actor.name}` ->\n'
f'{pformat(accept_addrs)}'
)
async with get_arbiter(*actor._arb_addr) as arb_portal:
await arb_portal.run_from_ns(
'self',
'register_actor',
uid=actor.uid,
sockaddr=accept_addr,
)
# TODO: ideally we don't fan out to all registrars
# if addresses point to the same actor..
# So we need a way to detect that? maybe iterate
# only on unique actor uids?
for addr in actor._reg_addrs:
assert isinstance(addr, tuple)
assert addr[1] # non-zero after bind
registered_with_arbiter = True
async with get_registry(*addr) as reg_portal:
for accept_addr in accept_addrs:
if not accept_addr[1]:
await _debug.pause()
assert accept_addr[1]
await reg_portal.run_from_ns(
'self',
'register_actor',
uid=actor.uid,
sockaddr=accept_addr,
)
is_registered: bool = True
# init steps complete
task_status.started()
@ -1401,18 +1485,18 @@ async def async_main(
log.runtime("Closing all actor lifetime contexts")
actor.lifetime_stack.close()
if not registered_with_arbiter:
if not is_registered:
# TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger
# once we have that all working with std streams locking?
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {actor._arb_addr}?")
f"@ {actor._reg_addrs[0]}?")
log.error(
"\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n"
"\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n"
"\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n"
"\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n"
"\tIf this is a sub-actor likely its parent will keep running "
"\tcorrectly if this error is caught and ignored.."
)
if actor._parent_chan:
@ -1447,29 +1531,35 @@ async def async_main(
actor.lifetime_stack.close()
# Unregister actor from the arbiter
# Unregister actor from the registry
if (
registered_with_arbiter
and not actor.is_arbiter
is_registered
and not actor.is_registrar
):
failed = False
assert isinstance(actor._arb_addr, tuple)
with trio.move_on_after(0.5) as cs:
cs.shield = True
try:
async with get_arbiter(*actor._arb_addr) as arb_portal:
await arb_portal.run_from_ns(
'self',
'unregister_actor',
uid=actor.uid
)
except OSError:
failed: bool = False
for addr in actor._reg_addrs:
assert isinstance(addr, tuple)
with trio.move_on_after(0.5) as cs:
cs.shield = True
try:
async with get_registry(
*addr,
) as reg_portal:
await reg_portal.run_from_ns(
'self',
'unregister_actor',
uid=actor.uid
)
except OSError:
failed = True
if cs.cancelled_caught:
failed = True
if cs.cancelled_caught:
failed = True
if failed:
log.warning(
f"Failed to unregister {actor.name} from arbiter")
if failed:
log.warning(
f'Failed to unregister {actor.name} from '
f'registar @ {addr}'
)
# Ensure all peers (actors connected to us as clients) are finished
if not actor._no_more_peers.is_set():
@ -1761,18 +1851,36 @@ async def process_messages(
class Arbiter(Actor):
'''
A special actor who knows all the other actors and always has
access to a top level nursery.
A special registrar actor who can contact all other actors
within its immediate process tree and possibly keeps a registry
of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor"
and thus always has access to the top-most-level actor
(process) nursery.
The arbiter is by default the first actor spawned on each host
and is responsible for keeping track of all other actors for
coordination purposes. If a new main process is launched and an
arbiter is already running that arbiter will be used.
By default, the registrar is always initialized when and if no
other registrar socket addrs have been specified to runtime
init entry-points (such as `open_root_actor()` or
`open_nursery()`). Any time a new main process is launched (and
thus thus a new root actor created) and, no existing registrar
can be contacted at the provided `registry_addr`, then a new
one is always created; however, if one can be reached it is
used.
Normally a distributed app requires at least registrar per
logical host where for that given "host space" (aka localhost
IPC domain of addresses) it is responsible for making all other
host (local address) bound actors *discoverable* to external
actor trees running on remote hosts.
'''
is_arbiter = True
def __init__(self, *args, **kwargs) -> None:
def __init__(
self,
*args,
**kwargs,
) -> None:
self._registry: dict[
tuple[str, str],
@ -1814,7 +1922,10 @@ class Arbiter(Actor):
# unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return {'.'.join(key): val for key, val in self._registry.items()}
return {
'.'.join(key): val
for key, val in self._registry.items()
}
async def wait_for_actor(
self,
@ -1852,8 +1963,15 @@ class Arbiter(Actor):
sockaddr: tuple[str, int]
) -> None:
uid = name, _ = (str(uid[0]), str(uid[1]))
self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
uid = name, hash = (str(uid[0]), str(uid[1]))
addr = (host, port) = (
str(sockaddr[0]),
int(sockaddr[1]),
)
if port == 0:
await _debug.pause()
assert port # should never be 0-dynamic-os-alloc
self._registry[uid] = addr
# pop and signal all waiter events
events = self._waiters.pop(name, [])

View File

@ -294,7 +294,7 @@ async def new_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
@ -316,7 +316,7 @@ async def new_proc(
actor_nursery,
subactor,
errors,
bind_addr,
bind_addrs,
parent_addr,
_runtime_vars, # run time vars
infect_asyncio=infect_asyncio,
@ -331,7 +331,7 @@ async def trio_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@ -417,12 +417,11 @@ async def trio_proc(
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"enable_modules": subactor.enable_modules,
"_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1],
"_runtime_vars": _runtime_vars,
'_parent_main_data': subactor._parent_main_data,
'enable_modules': subactor.enable_modules,
'_reg_addrs': subactor._reg_addrs,
'bind_addrs': bind_addrs,
'_runtime_vars': _runtime_vars,
})
# track subactor in current nursery
@ -509,7 +508,7 @@ async def mp_proc(
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@ -567,7 +566,7 @@ async def mp_proc(
target=_mp_main,
args=(
subactor,
bind_addr,
bind_addrs,
fs_info,
_spawn_method,
parent_addr,

View File

@ -21,10 +21,7 @@
from contextlib import asynccontextmanager as acm
from functools import partial
import inspect
from typing import (
Optional,
TYPE_CHECKING,
)
from typing import TYPE_CHECKING
import typing
import warnings
@ -94,7 +91,7 @@ class ActorNursery:
tuple[
Actor,
trio.Process | mp.Process,
Optional[Portal],
Portal | None,
]
] = {}
# portals spawned with ``run_in_actor()`` are
@ -110,12 +107,12 @@ class ActorNursery:
self,
name: str,
*,
bind_addr: tuple[str, int] = _default_bind_addr,
bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
nursery: trio.Nursery | None = None,
debug_mode: Optional[bool] | None = None,
debug_mode: bool | None = None,
infect_asyncio: bool = False,
) -> Portal:
'''
@ -150,7 +147,9 @@ class ActorNursery:
# modules allowed to invoked funcs from
enable_modules=enable_modules,
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
# verbatim relay this actor's registrar addresses
registry_addrs=current_actor()._reg_addrs,
)
parent_addr = self._actor.accept_addr
assert parent_addr
@ -167,7 +166,7 @@ class ActorNursery:
self,
subactor,
self.errors,
bind_addr,
bind_addrs,
parent_addr,
_rtv, # run time vars
infect_asyncio=infect_asyncio,
@ -180,8 +179,8 @@ class ActorNursery:
fn: typing.Callable,
*,
name: Optional[str] = None,
bind_addr: tuple[str, int] = _default_bind_addr,
name: str | None = None,
bind_addrs: tuple[str, int] = [_default_bind_addr],
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
@ -208,7 +207,7 @@ class ActorNursery:
enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or []
),
bind_addr=bind_addr,
bind_addrs=bind_addrs,
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._ria_nursery,