Compare commits

...

21 Commits

Author SHA1 Message Date
Tyler Goodlet 4d675deb24 `_root`: drop unused `typing` import 2025-03-18 12:23:40 -04:00
Tyler Goodlet 31297171fc Use `import <name> as <name>,` style over `__all__` in pkg mod 2025-03-18 12:23:39 -04:00
Tyler Goodlet d7a9ddd4a9 Log chan-server-startup failures via `.exception()` 2025-03-18 12:22:47 -04:00
Tyler Goodlet 43b84c99b6 `.discovery.get_arbiter()`: add warning around this now deprecated usage 2025-03-18 12:22:47 -04:00
Tyler Goodlet b7f2258f15 Add `open_root_actor(ensure_registry: bool)`
Allows forcing the opened actor to either obtain the passed registry
addrs or raise a runtime error.
2025-03-18 12:22:47 -04:00
Tyler Goodlet 1dfa109879 Fix doc string "its" typo.. 2025-03-18 12:22:47 -04:00
Tyler Goodlet 0453e3565e Test with `any(portals)` since `gather_contexts()` will return `list[None | tuple]` 2025-03-18 12:22:47 -04:00
Tyler Goodlet 0bf13c50b4 Change remaining internals to use `Actor.reg_addrs` 2025-03-18 12:22:25 -04:00
Tyler Goodlet 2e69aa0f67 Expose per-actor registry addrs via `.reg_addrs`
Since it's handy to be able to debug the *writing* of this instance var
(particularly when checking state passed down to a child in
`Actor._from_parent()`), rename and wrap the underlying
`Actor._reg_addrs` as a settable `@property` and add validation to
the `.setter` for sanity - actor discovery is a critical functionality.

Other tweaks:
- fix `.cancel_soon()` to pass expected argument..
- update internal runtime error message to be simpler and link to GH issues.
- use new `Actor.reg_addrs` throughout core.
2025-03-18 12:22:25 -04:00
Tyler Goodlet fc9c7e6e3f Get remaining suites passing..
..by ensuring `reg_addr` fixture value passthrough to subactor eps
2025-03-18 12:22:25 -04:00
Tyler Goodlet 354a4c2226 Always dynamically re-read the `._root._default_lo_addrs` value in `find_actor()` 2025-03-18 12:22:25 -04:00
Tyler Goodlet d2c88e9709 Ensure `registry_addrs` is always set to something 2025-03-18 12:22:25 -04:00
Tyler Goodlet ef179b69f2 Rename fixture `arb_addr` -> `reg_addr` and set the session value globally as `._root._default_lo_addrs` 2025-03-18 12:22:25 -04:00
Tyler Goodlet 837602a011 Facepalm, `wait_for_actor()` dun take an addr `list`.. 2025-03-18 12:22:25 -04:00
Tyler Goodlet debacef30e ._root: set a `_default_lo_addrs` and apply it when not provided by caller 2025-03-18 12:21:47 -04:00
Tyler Goodlet 7ae405ef5a Always set default reg addr in `find_actor()` if not defined 2025-03-18 12:21:47 -04:00
Tyler Goodlet e428bf0a34 Oof, default reg addrs needs to be in `list[tuple]` form.. 2025-03-18 12:21:47 -04:00
Tyler Goodlet d448bb81bd Add post-mortem catch around failed transport addr binds to aid with runtime debugging 2025-03-18 12:21:47 -04:00
Tyler Goodlet 77108a9759 Rename to `parse_maddr()` and fill out doc strings 2025-03-18 12:21:47 -04:00
Tyler Goodlet 1720fefa1d Add libp2p style "multi-address" parser from `piker`
Details are in the module docs; this is a first draft with lotsa room
for refinement and extension.
2025-03-18 12:21:47 -04:00
Tyler Goodlet a3c1f8e419 Init-support for "multi homed" transports
Since we'd like to eventually allow a diverse set of transport
(protocol) methods and stacks, and a multi-peer discovery system for
distributed actor-tree applications, this reworks all runtime internals
to support multi-homing for any given tree on a logical host. In other
words any actor can now bind its transport server (currently only
unsecured TCP + `msgspec`) to more then one address available in its
(linux) network namespace. Further, registry actors (now dubbed
"registars" instead of "arbiters") can also similarly bind to multiple
network addresses and provide discovery services to remote actors via
multiple addresses which can now be provided at runtime startup.

Deats:
- adjust `._runtime` internals to use a `list[tuple[str, int]]` (and
  thus pluralized) socket address sequence where applicable for transport
  server socket binds, now exposed via `Actor.accept_addrs`:
  - `Actor.__init__()` now takes a `registry_addrs: list`.
  - `Actor.is_arbiter` -> `.is_registrar`.
  - `._arb_addr` -> `._reg_addrs: list[tuple]`.
  - always reg and de-reg from all registrars in `async_main()`.
  - only set the global runtime var `'_root_mailbox'` to the loopback
    address since normally all in-tree processes should have access to
    it, right?
  - `._serve_forever()` task now takes `listen_sockaddrs: list[tuple]`
- make `open_root_actor()` take a `registry_addrs: list[tuple[str, int]]`
  and defaults when not passed.
- change `ActorNursery.start_..()` methods take `bind_addrs: list` and
  pass down through the spawning layer(s) via the parent-seed-msg.
- generalize all `._discovery()` APIs to accept `registry_addrs`-like
  inputs and move all relevant subsystems to adopt the "registry" style
  naming instead of "arbiter":
  - make `find_actor()` support batched concurrent portal queries over
    all provided input addresses using `.trionics.gather_contexts()` Bo
  - syntax: move to using `async with <tuples>` 3.9+ style chained
    @acms.
  - a general modernization of the code to a python 3.9+ style.
  - start deprecation and change to "registry" naming / semantics:
    - `._discovery.get_arbiter()` -> `.get_registry()`
2025-03-18 12:21:45 -04:00
15 changed files with 857 additions and 337 deletions

View File

@ -93,12 +93,18 @@ _reg_addr: tuple[str, int] = (
'127.0.0.1',
random.randint(1000, 9999),
)
_arb_addr = _reg_addr
@pytest.fixture(scope='session')
def arb_addr():
return _arb_addr
def reg_addr() -> tuple[str, int]:
# globally override the runtime to the per-test-session-dynamic
# addr so that all tests never conflict with any other actor
# tree using the default.
from tractor import _root
_root._default_lo_addrs = [_reg_addr]
return _reg_addr
def pytest_generate_tests(metafunc):
@ -140,30 +146,35 @@ def sig_prog(proc, sig):
def daemon(
loglevel: str,
testdir,
arb_addr: tuple[str, int],
reg_addr: tuple[str, int],
):
'''
Run a daemon actor as a "remote arbiter".
Run a daemon root actor as a separate actor-process tree and
"remote registrar" for discovery-protocol related tests.
'''
if loglevel in ('trace', 'debug'):
# too much logging will lock up the subproc (smh)
loglevel = 'info'
# XXX: too much logging will lock up the subproc (smh)
loglevel: str = 'info'
cmdargs = [
sys.executable, '-c',
"import tractor; tractor.run_daemon([], registry_addr={}, loglevel={})"
.format(
arb_addr,
"'{}'".format(loglevel) if loglevel else None)
code: str = (
"import tractor; "
"tractor.run_daemon([], registry_addrs={reg_addrs}, loglevel={ll})"
).format(
reg_addrs=str([reg_addr]),
ll="'{}'".format(loglevel) if loglevel else None,
)
cmd: list[str] = [
sys.executable,
'-c', code,
]
kwargs = dict()
kwargs = {}
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc = testdir.popen(
cmdargs,
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,

View File

@ -203,7 +203,7 @@ def ctlc(
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor._debug import TractorConfig
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
@ -685,7 +685,7 @@ def test_multi_daemon_subactors(
# now the root actor won't clobber the bp_forever child
# during it's first access to the debug lock, but will instead
# wait for the lock to release, by the edge triggered
# ``_debug.Lock.no_remote_has_tty`` event before sending cancel messages
# ``devx._debug.Lock.no_remote_has_tty`` event before sending cancel messages
# (via portals) to its underlings B)
# at some point here there should have been some warning msg from

View File

@ -20,7 +20,7 @@ from tractor._testing import (
def run_example_in_subproc(
loglevel: str,
testdir,
arb_addr: tuple[str, int],
reg_addr: tuple[str, int],
):
@contextmanager

View File

@ -15,9 +15,19 @@ async def sleep_back_actor(
func_name,
func_defined,
exposed_mods,
*,
reg_addr: tuple,
):
if actor_name:
async with tractor.find_actor(actor_name) as portal:
async with tractor.find_actor(
actor_name,
# NOTE: must be set manually since
# the subactor doesn't have the reg_addr
# fixture code run in it!
# TODO: maybe we should just set this once in the
# _state mod and derive to all children?
registry_addrs=[reg_addr],
) as portal:
try:
await portal.run(__name__, func_name)
except tractor.RemoteActorError as err:
@ -52,11 +62,17 @@ async def short_sleep():
'fail_on_syntax',
],
)
def test_rpc_errors(reg_addr, to_call, testdir):
"""Test errors when making various RPC requests to an actor
def test_rpc_errors(
reg_addr,
to_call,
testdir,
):
'''
Test errors when making various RPC requests to an actor
that either doesn't have the requested module exposed or doesn't define
the named function.
"""
'''
exposed_mods, funcname, inside_err = to_call
subactor_exposed_mods = []
func_defined = globals().get(funcname, False)
@ -84,8 +100,13 @@ def test_rpc_errors(reg_addr, to_call, testdir):
# spawn a subactor which calls us back
async with tractor.open_nursery(
arbiter_addr=reg_addr,
registry_addrs=[reg_addr],
enable_modules=exposed_mods.copy(),
# NOTE: will halt test in REPL if uncommented, so only
# do that if actually debugging subactor but keep it
# disabled for the test.
# debug_mode=True,
) as n:
actor = tractor.current_actor()
@ -102,6 +123,7 @@ def test_rpc_errors(reg_addr, to_call, testdir):
exposed_mods=exposed_mods,
func_defined=True if func_defined else False,
enable_modules=subactor_exposed_mods,
reg_addr=reg_addr,
)
def run():

View File

@ -32,8 +32,7 @@ async def spawn(
if actor.is_arbiter:
async with tractor.open_nursery(
) as nursery:
async with tractor.open_nursery() as nursery:
# forks here
portal = await nursery.run_in_actor(
@ -55,7 +54,9 @@ async def spawn(
return 10
def test_local_arbiter_subactor_global_state(reg_addr):
def test_local_arbiter_subactor_global_state(
reg_addr,
):
result = trio.run(
spawn,
True,

View File

@ -18,76 +18,49 @@
tractor: structured concurrent ``trio``-"actors".
"""
from exceptiongroup import BaseExceptionGroup
from exceptiongroup import BaseExceptionGroup as BaseExceptionGroup
from ._clustering import open_actor_cluster
from ._clustering import (
open_actor_cluster as open_actor_cluster,
)
from ._context import (
Context, # the type
context, # a func-decorator
Context as Context, # the type
context as context, # a func-decorator
)
from ._streaming import (
MsgStream,
stream,
MsgStream as MsgStream,
stream as stream,
)
from ._discovery import (
get_arbiter,
find_actor,
wait_for_actor,
query_actor,
get_arbiter as get_arbiter,
find_actor as find_actor,
wait_for_actor as wait_for_actor,
query_actor as query_actor,
)
from ._supervise import (
open_nursery as open_nursery,
ActorNursery as ActorNursery,
)
from ._supervise import open_nursery
from ._state import (
current_actor,
is_root_process,
current_actor as current_actor,
is_root_process as is_root_process,
)
from ._exceptions import (
RemoteActorError,
ModuleNotExposed,
ContextCancelled,
RemoteActorError as RemoteActorError,
ModuleNotExposed as ModuleNotExposed,
ContextCancelled as ContextCancelled,
)
from .devx import (
breakpoint,
pause,
pause_from_sync,
post_mortem,
breakpoint as breakpoint,
pause as pause,
pause_from_sync as pause_from_sync,
post_mortem as post_mortem,
)
from . import msg
from . import msg as msg
from ._root import (
run_daemon,
open_root_actor,
run_daemon as run_daemon,
open_root_actor as open_root_actor,
)
from ._ipc import Channel
from ._portal import Portal
from ._runtime import Actor
__all__ = [
'Actor',
'BaseExceptionGroup',
'Channel',
'Context',
'ContextCancelled',
'ModuleNotExposed',
'MsgStream',
'Portal',
'RemoteActorError',
'breakpoint',
'context',
'current_actor',
'find_actor',
'query_actor',
'get_arbiter',
'is_root_process',
'msg',
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'pause',
'post_mortem',
'pause_from_sync',
'query_actor',
'run_daemon',
'stream',
'to_asyncio',
'wait_for_actor',
]
from ._ipc import Channel as Channel
from ._portal import Portal as Portal
from ._runtime import Actor as Actor

View File

@ -15,16 +15,20 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Actor discovery API.
Discovery (protocols) API for automatic addressing and location
management of (service) actors.
"""
from __future__ import annotations
from typing import (
Optional,
Union,
AsyncGenerator,
AsyncContextManager,
TYPE_CHECKING,
)
from contextlib import asynccontextmanager as acm
import warnings
from .trionics import gather_contexts
from ._ipc import _connect_chan, Channel
from ._portal import (
Portal,
@ -34,13 +38,19 @@ from ._portal import (
from ._state import current_actor, _runtime_vars
@acm
async def get_arbiter(
if TYPE_CHECKING:
from ._runtime import Actor
@acm
async def get_registry(
host: str,
port: int,
) -> AsyncGenerator[Union[Portal, LocalPortal], None]:
) -> AsyncGenerator[
Portal | LocalPortal | None,
None,
]:
'''
Return a portal instance connected to a local or remote
arbiter.
@ -51,16 +61,33 @@ async def get_arbiter(
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_arbiter:
if actor.is_registrar:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor, Channel((host, port)))
yield LocalPortal(
actor,
Channel((host, port))
)
else:
async with _connect_chan(host, port) as chan:
async with (
_connect_chan(host, port) as chan,
open_portal(chan) as regstr_ptl,
):
yield regstr_ptl
async with open_portal(chan) as arb_portal:
yield arb_portal
# TODO: deprecate and this remove _arbiter form!
@acm
async def get_arbiter(*args, **kwargs):
warnings.warn(
'`tractor.get_arbiter()` is now deprecated!\n'
'Use `.get_registry()` instead!',
DeprecationWarning,
stacklevel=2,
)
async with get_registry(*args, **kwargs) as to_yield:
yield to_yield
@acm
@ -68,51 +95,80 @@ async def get_root(
**kwargs,
) -> AsyncGenerator[Portal, None]:
# TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs?
host, port = _runtime_vars['_root_mailbox']
assert host is not None
async with _connect_chan(host, port) as chan:
async with open_portal(chan, **kwargs) as portal:
yield portal
async with (
_connect_chan(host, port) as chan,
open_portal(chan, **kwargs) as portal,
):
yield portal
@acm
async def query_actor(
name: str,
arbiter_sockaddr: Optional[tuple[str, int]] = None,
arbiter_sockaddr: tuple[str, int] | None = None,
regaddr: tuple[str, int] | None = None,
) -> AsyncGenerator[tuple[str, int], None]:
) -> AsyncGenerator[
tuple[str, int] | None,
None,
]:
'''
Simple address lookup for a given actor name.
Make a transport address lookup for an actor name to a specific
registrar.
Returns the (socket) address or ``None``.
Returns the (socket) address or ``None`` if no entry under that
name exists for the given registrar listening @ `regaddr`.
'''
actor = current_actor()
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr
) as arb_portal:
actor: Actor = current_actor()
if (
name == 'registrar'
and actor.is_registrar
):
raise RuntimeError(
'The current actor IS the registry!?'
)
sockaddr = await arb_portal.run_from_ns(
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.query_actor(regaddr=<blah>)` is deprecated.\n'
'Use `registry_addrs: list[tuple]` instead!',
DeprecationWarning,
stacklevel=2,
)
regaddr: list[tuple[str, int]] = arbiter_sockaddr
reg_portal: Portal
regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
async with get_registry(*regaddr) as reg_portal:
# TODO: return portals to all available actors - for now
# just the last one that registered
sockaddr: tuple[str, int] = await reg_portal.run_from_ns(
'self',
'find_actor',
name=name,
)
# TODO: return portals to all available actors - for now just
# the last one that registered
if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter")
yield sockaddr if sockaddr else None
yield sockaddr
@acm
async def find_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None
arbiter_sockaddr: tuple[str, int] | None = None,
registry_addrs: list[tuple[str, int]] | None = None,
) -> AsyncGenerator[Optional[Portal], None]:
only_first: bool = True,
raise_on_none: bool = False,
) -> AsyncGenerator[
Portal | list[Portal] | None,
None,
]:
'''
Ask the arbiter to find actor(s) by name.
@ -120,24 +176,79 @@ async def find_actor(
known to the arbiter.
'''
async with query_actor(
name=name,
arbiter_sockaddr=arbiter_sockaddr,
) as sockaddr:
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.find_actor(arbiter_sockaddr=<blah>)` is deprecated.\n'
'Use `registry_addrs: list[tuple]` instead!',
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr]
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
@acm
async def maybe_open_portal_from_reg_addr(
addr: tuple[str, int],
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
if not registry_addrs:
# XXX NOTE: make sure to dynamically read the value on
# every call since something may change it globally (eg.
# like in our discovery test suite)!
from . import _root
registry_addrs = _root._default_lo_addrs
maybe_portals: list[
AsyncContextManager[tuple[str, int]]
] = list(
maybe_open_portal_from_reg_addr(addr)
for addr in registry_addrs
)
async with gather_contexts(
mngrs=maybe_portals,
) as portals:
# log.runtime(
# 'Gathered portals:\n'
# f'{portals}'
# )
# NOTE: `gather_contexts()` will return a
# `tuple[None, None, ..., None]` if no contact
# can be made with any regstrar at any of the
# N provided addrs!
if not any(portals):
if raise_on_none:
raise RuntimeError(
f'No actor "{name}" found registered @ {registry_addrs}'
)
yield None
return
portals: list[Portal] = list(portals)
if only_first:
yield portals[0]
else:
# TODO: currently this may return multiple portals
# given there are multi-homed or multiple registrars..
# SO, we probably need de-duplication logic?
yield portals
@acm
async def wait_for_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None,
# registry_addr: tuple[str, int] | None = None,
registry_addr: tuple[str, int] | None = None,
) -> AsyncGenerator[Portal, None]:
'''
@ -146,17 +257,31 @@ async def wait_for_actor(
A portal to the first registered actor is returned.
'''
actor = current_actor()
actor: Actor = current_actor()
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr,
) as arb_portal:
sockaddrs = await arb_portal.run_from_ns(
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.wait_for_actor(arbiter_sockaddr=<foo>)` is deprecated.\n'
'Use `registry_addr: tuple` instead!',
DeprecationWarning,
stacklevel=2,
)
registry_addr: tuple[str, int] = arbiter_sockaddr
# TODO: use `.trionics.gather_contexts()` like
# above in `find_actor()` as well?
reg_portal: Portal
regaddr: tuple[str, int] = registry_addr or actor.reg_addrs[0]
async with get_registry(*regaddr) as reg_portal:
sockaddrs = await reg_portal.run_from_ns(
'self',
'wait_for_actor',
name=name,
)
sockaddr = sockaddrs[-1]
# get latest registered addr by default?
# TODO: offer multi-portal yields in multi-homed case?
sockaddr: tuple[str, int] = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:

View File

@ -47,8 +47,8 @@ log = get_logger(__name__)
def _mp_main(
actor: Actor, # type: ignore
accept_addr: tuple[str, int],
actor: Actor,
accept_addrs: list[tuple[str, int]],
forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey,
parent_addr: tuple[str, int] | None = None,
@ -77,8 +77,8 @@ def _mp_main(
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
async_main,
actor,
accept_addr,
actor=actor,
accept_addrs=accept_addrs,
parent_addr=parent_addr
)
try:
@ -96,7 +96,7 @@ def _mp_main(
def _trio_main(
actor: Actor, # type: ignore
actor: Actor,
*,
parent_addr: tuple[str, int] | None = None,
infect_asyncio: bool = False,

View File

@ -517,7 +517,9 @@ class Channel:
@acm
async def _connect_chan(
host: str, port: int
host: str,
port: int
) -> typing.AsyncGenerator[Channel, None]:
'''
Create and connect a channel with disconnect on context manager

View File

@ -0,0 +1,151 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Multiaddress parser and utils according the spec(s) defined by
`libp2p` and used in dependent project such as `ipfs`:
- https://docs.libp2p.io/concepts/fundamentals/addressing/
- https://github.com/libp2p/specs/blob/master/addressing/README.md
'''
from typing import Iterator
from bidict import bidict
# TODO: see if we can leverage libp2p ecosys projects instead of
# rolling our own (parser) impls of the above addressing specs:
# - https://github.com/libp2p/py-libp2p
# - https://docs.libp2p.io/concepts/nat/circuit-relay/#relay-addresses
# prots: bidict[int, str] = bidict({
prots: bidict[int, str] = {
'ipv4': 3,
'ipv6': 3,
'wg': 3,
'tcp': 4,
'udp': 4,
# TODO: support the next-gen shite Bo
# 'quic': 4,
# 'ssh': 7, # via rsyscall bootstrapping
}
prot_params: dict[str, tuple[str]] = {
'ipv4': ('addr',),
'ipv6': ('addr',),
'wg': ('addr', 'port', 'pubkey'),
'tcp': ('port',),
'udp': ('port',),
# 'quic': ('port',),
# 'ssh': ('port',),
}
def iter_prot_layers(
multiaddr: str,
) -> Iterator[
tuple[
int,
list[str]
]
]:
'''
Unpack a libp2p style "multiaddress" into multiple "segments"
for each "layer" of the protocoll stack (in OSI terms).
'''
tokens: list[str] = multiaddr.split('/')
root, tokens = tokens[0], tokens[1:]
assert not root # there is a root '/' on LHS
itokens = iter(tokens)
prot: str | None = None
params: list[str] = []
for token in itokens:
# every prot path should start with a known
# key-str.
if token in prots:
if prot is None:
prot: str = token
else:
yield prot, params
prot = token
params = []
elif token not in prots:
params.append(token)
else:
yield prot, params
def parse_maddr(
multiaddr: str,
) -> dict[str, str | int | dict]:
'''
Parse a libp2p style "multiaddress" into its distinct protocol
segments where each segment is of the form:
`../<protocol>/<param0>/<param1>/../<paramN>`
and is loaded into a (order preserving) `layers: dict[str,
dict[str, Any]` which holds each protocol-layer-segment of the
original `str` path as a separate entry according to its approx
OSI "layer number".
Any `paramN` in the path must be distinctly defined by a str-token in the
(module global) `prot_params` table.
For eg. for wireguard which requires an address, port number and publickey
the protocol params are specified as the entry:
'wg': ('addr', 'port', 'pubkey'),
and are thus parsed from a maddr in that order:
`'/wg/1.1.1.1/51820/<pubkey>'`
'''
layers: dict[str, str | int | dict] = {}
for (
prot_key,
params,
) in iter_prot_layers(multiaddr):
layer: int = prots[prot_key] # OSI layer used for sorting
ep: dict[str, int | str] = {'layer': layer}
layers[prot_key] = ep
# TODO; validation and resolving of names:
# - each param via a validator provided as part of the
# prot_params def? (also see `"port"` case below..)
# - do a resolv step that will check addrs against
# any loaded network.resolv: dict[str, str]
rparams: list = list(reversed(params))
for key in prot_params[prot_key]:
val: str | int = rparams.pop()
# TODO: UGHH, dunno what we should do for validation
# here, put it in the params spec somehow?
if key == 'port':
val = int(val)
ep[key] = val
return layers

View File

@ -461,7 +461,12 @@ class LocalPortal:
actor: 'Actor' # type: ignore # noqa
channel: Channel
async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any:
async def run_from_ns(
self,
ns: str,
func_name: str,
**kwargs,
) -> Any:
'''
Run a requested local function from a namespace path and
return it's result.

View File

@ -25,7 +25,6 @@ import logging
import signal
import sys
import os
import typing
import warnings
@ -47,8 +46,14 @@ from ._exceptions import is_multi_cancelled
# set at startup and after forks
_default_arbiter_host: str = '127.0.0.1'
_default_arbiter_port: int = 1616
_default_host: str = '127.0.0.1'
_default_port: int = 1616
# default registry always on localhost
_default_lo_addrs: list[tuple[str, int]] = [(
_default_host,
_default_port,
)]
logger = log.get_logger('tractor')
@ -59,10 +64,10 @@ async def open_root_actor(
*,
# defaults are above
arbiter_addr: tuple[str, int] | None = None,
registry_addrs: list[tuple[str, int]] | None = None,
# defaults are above
registry_addr: tuple[str, int] | None = None,
arbiter_addr: tuple[str, int] | None = None,
name: str | None = 'root',
@ -80,7 +85,11 @@ async def open_root_actor(
enable_modules: list | None = None,
rpc_module_paths: list | None = None,
) -> typing.Any:
# NOTE: allow caller to ensure that only one registry exists
# and that this call creates it.
ensure_registry: bool = False,
) -> Actor:
'''
Runtime init entry point for ``tractor``.
@ -116,20 +125,19 @@ async def open_root_actor(
if arbiter_addr is not None:
warnings.warn(
'`arbiter_addr` is now deprecated and has been renamed to'
'`registry_addr`.\nUse that instead..',
'`arbiter_addr` is now deprecated\n'
'Use `registry_addrs: list[tuple]` instead..',
DeprecationWarning,
stacklevel=2,
)
registry_addrs = [arbiter_addr]
registry_addr = (host, port) = (
registry_addr
or arbiter_addr
or (
_default_arbiter_host,
_default_arbiter_port,
)
registry_addrs: list[tuple[str, int]] = (
registry_addrs
or
_default_lo_addrs
)
assert registry_addrs
loglevel = (loglevel or log._default_loglevel).upper()
@ -158,73 +166,131 @@ async def open_root_actor(
log.get_console_log(loglevel)
try:
# make a temporary connection to see if an arbiter exists,
# if one can't be made quickly we assume none exists.
arbiter_found = False
# closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = []
# TODO: this connect-and-bail forces us to have to carefully
# rewrap TCP 104-connection-reset errors as EOF so as to avoid
# propagating cancel-causing errors to the channel-msg loop
# machinery. Likely it would be better to eventually have
# a "discovery" protocol with basic handshake instead.
with trio.move_on_after(1):
async with _connect_chan(host, port):
arbiter_found = True
async def ping_tpt_socket(
addr: tuple[str, int],
timeout: float = 1,
) -> None:
'''
Attempt temporary connection to see if a registry is
listening at the requested address by a tranport layer
ping.
except OSError:
# TODO: make this a "discovery" log level?
logger.warning(f"No actor registry found @ {host}:{port}")
If a connection can't be made quickly we assume none no
server is listening at that addr.
# create a local actor and start up its main routine/task
if arbiter_found:
'''
try:
# TODO: this connect-and-bail forces us to have to
# carefully rewrap TCP 104-connection-reset errors as
# EOF so as to avoid propagating cancel-causing errors
# to the channel-msg loop machinery. Likely it would
# be better to eventually have a "discovery" protocol
# with basic handshake instead?
with trio.move_on_after(timeout):
async with _connect_chan(*addr):
ponged_addrs.append(addr)
except OSError:
# TODO: make this a "discovery" log level?
logger.warning(f'No actor registry found @ {addr}')
async with trio.open_nursery() as tn:
for addr in registry_addrs:
tn.start_soon(
ping_tpt_socket,
tuple(addr), # TODO: just drop this requirement?
)
trans_bind_addrs: list[tuple[str, int]] = []
# Create a new local root-actor instance which IS NOT THE
# REGISTRAR
if ponged_addrs:
if ensure_registry:
raise RuntimeError(
f'Failed to open `{name}`@{ponged_addrs}: '
'registry socket(s) already bound'
)
# we were able to connect to an arbiter
logger.info(f"Arbiter seems to exist @ {host}:{port}")
logger.info(
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
)
actor = Actor(
name or 'anonymous',
arbiter_addr=registry_addr,
name=name or 'anonymous',
registry_addrs=ponged_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
host, port = (host, 0)
# DO NOT use the registry_addrs as the transport server
# addrs for this new non-registar, root-actor.
for host, port in ponged_addrs:
# NOTE: zero triggers dynamic OS port allocation
trans_bind_addrs.append((host, 0))
# Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors.
else:
# start this local actor as the arbiter (aka a regular actor who
# manages the local registry of "mailboxes")
# Note that if the current actor is the arbiter it is desirable
# for it to stay up indefinitely until a re-election process has
# taken place - which is not implemented yet FYI).
# NOTE that if the current actor IS THE REGISTAR, the
# following init steps are taken:
# - the tranport layer server is bound to each (host, port)
# pair defined in provided registry_addrs, or the default.
trans_bind_addrs = registry_addrs
# - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub)
# actors are terminated (via SC supervision) or,
# a re-election process has taken place.
# NOTE: all of ^ which is not implemented yet - see:
# https://github.com/goodboy/tractor/issues/216
# https://github.com/goodboy/tractor/pull/348
# https://github.com/goodboy/tractor/issues/296
actor = Arbiter(
name or 'arbiter',
arbiter_addr=registry_addr,
name or 'registrar',
registry_addrs=registry_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
# Start up main task set via core actor-runtime nurseries.
try:
# assign process-local actor
_state._current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
logger.info(f"Starting local {actor} @ {host}:{port}")
ml_addrs_str: str = '\n'.join(
f'@{addr}' for addr in trans_bind_addrs
)
logger.info(
f'Starting local {actor.uid} on the following transport addrs:\n'
f'{ml_addrs_str}'
)
# start the actor runtime in a new task
async with trio.open_nursery() as nursery:
# ``_runtime.async_main()`` creates an internal nursery and
# thus blocks here until the entire underlying actor tree has
# terminated thereby conducting structured concurrency.
# ``_runtime.async_main()`` creates an internal nursery
# and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called
# "end-to-end" structured concurrency throughout an
# entire hierarchical python sub-process set; all
# "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as
# well.
await nursery.start(
partial(
async_main,
actor,
accept_addr=(host, port),
accept_addrs=trans_bind_addrs,
parent_addr=None
)
)
@ -236,7 +302,7 @@ async def open_root_actor(
BaseExceptionGroup,
) as err:
entered = await _debug._maybe_enter_pm(err)
entered: bool = await _debug._maybe_enter_pm(err)
if (
not entered
and
@ -244,7 +310,8 @@ async def open_root_actor(
):
logger.exception('Root actor crashed:\n')
# always re-raise
# ALWAYS re-raise any error bubbled up from the
# runtime!
raise
finally:
@ -265,7 +332,7 @@ async def open_root_actor(
_state._current_actor = None
_state._last_actor_terminated = actor
# restore breakpoint hook state
# restore built-in `breakpoint()` hook state
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
@ -281,10 +348,7 @@ def run_daemon(
# runtime kwargs
name: str | None = 'root',
registry_addr: tuple[str, int] = (
_default_arbiter_host,
_default_arbiter_port,
),
registry_addrs: list[tuple[str, int]] = _default_lo_addrs,
start_method: str | None = None,
debug_mode: bool = False,
@ -308,7 +372,7 @@ def run_daemon(
async def _main():
async with open_root_actor(
registry_addr=registry_addr,
registry_addrs=registry_addrs,
name=name,
start_method=start_method,
debug_mode=debug_mode,

View File

@ -45,6 +45,7 @@ from functools import partial
from itertools import chain
import importlib
import importlib.util
import os
from pprint import pformat
import signal
import sys
@ -55,7 +56,7 @@ from typing import (
)
import uuid
from types import ModuleType
import os
import warnings
import trio
from trio import (
@ -77,8 +78,8 @@ from ._exceptions import (
ContextCancelled,
TransportClosed,
)
from ._discovery import get_arbiter
from .devx import _debug
from ._discovery import get_registry
from ._portal import Portal
from . import _state
from . import _mp_fixup_main
@ -127,6 +128,11 @@ class Actor:
# ugh, we need to get rid of this and replace with a "registry" sys
# https://github.com/goodboy/tractor/issues/216
is_arbiter: bool = False
@property
def is_registrar(self) -> bool:
return self.is_arbiter
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork
@ -164,8 +170,12 @@ class Actor:
enable_modules: list[str] = [],
uid: str | None = None,
loglevel: str | None = None,
registry_addrs: list[tuple[str, int]] | None = None,
spawn_method: str | None = None,
# TODO: remove!
arbiter_addr: tuple[str, int] | None = None,
spawn_method: str | None = None
) -> None:
'''
This constructor is called in the parent actor **before** the spawning
@ -189,27 +199,30 @@ class Actor:
# always include debugging tools module
enable_modules.append('tractor.devx._debug')
mods = {}
self.enable_modules: dict[str, str] = {}
for name in enable_modules:
mod = importlib.import_module(name)
mods[name] = _get_mod_abspath(mod)
mod: ModuleType = importlib.import_module(name)
self.enable_modules[name] = _get_mod_abspath(mod)
self.enable_modules = mods
self._mods: dict[str, ModuleType] = {}
self.loglevel = loglevel
self.loglevel: str = loglevel
self._arb_addr: tuple[str, int] | None = (
str(arbiter_addr[0]),
int(arbiter_addr[1])
) if arbiter_addr else None
if arbiter_addr is not None:
warnings.warn(
'`Actor(arbiter_addr=<blah>)` is now deprecated.\n'
'Use `registry_addrs: list[tuple]` instead.',
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[tuple[str, int]] = [arbiter_addr]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
# by the user (currently called the "arbiter")
self._spawn_method = spawn_method
self._spawn_method: str = spawn_method
self._peers: defaultdict = defaultdict(list)
self._peer_connected: dict = {}
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
self._no_more_peers = trio.Event()
self._no_more_peers.set()
self._ongoing_rpc_tasks = trio.Event()
@ -239,6 +252,44 @@ class Actor:
ActorNursery | None,
] = {} # type: ignore # noqa
# when provided, init the registry addresses property from
# input via the validator.
self._reg_addrs: list[tuple[str, int]] = []
if registry_addrs:
self.reg_addrs: list[tuple[str, int]] = registry_addrs
@property
def reg_addrs(self) -> list[tuple[str, int]]:
'''
List of (socket) addresses for all known (and contactable)
registry actors.
'''
return self._reg_addrs
@reg_addrs.setter
def reg_addrs(
self,
addrs: list[tuple[str, int]],
) -> None:
if not addrs:
log.warning(
'Empty registry address list is invalid:\n'
f'{addrs}'
)
return
# always sanity check the input list since it's critical
# that addrs are correct for discovery sys operation.
for addr in addrs:
if not isinstance(addr, tuple):
raise ValueError(
'Expected `Actor.reg_addrs: list[tuple[str, int]]`\n'
f'Got {addrs}'
)
self._reg_addrs = addrs
async def wait_for_peer(
self, uid: tuple[str, str]
) -> tuple[trio.Event, Channel]:
@ -336,6 +387,12 @@ class Actor:
self._no_more_peers = trio.Event() # unset by making new
chan = Channel.from_stream(stream)
their_uid: tuple[str, str]|None = chan.uid
if their_uid:
log.warning(
f'Re-connection from already known {their_uid}'
)
else:
log.runtime(f'New connection to us @{chan.raddr}')
con_msg: str = ''
if their_uid:
@ -517,16 +574,19 @@ class Actor:
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report that the
# IPC layer may have failed unexpectedly since it may be
# the cause of other downstream errors.
# registered within a local nursery, we report
# that the IPC layer may have failed
# unexpectedly since it may be the cause of
# other downstream errors.
entry = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
poll = getattr(proc, 'poll', None)
if poll and poll() is None:
if (
(poll := getattr(proc, 'poll', None))
and poll() is None
):
log.cancel(
f'Peer IPC broke but subproc is alive?\n\n'
@ -880,11 +940,11 @@ class Actor:
)
await chan.connect()
# TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names.
await self._do_handshake(chan)
accept_addr: tuple[str, int] | None = None
accept_addrs: list[tuple[str, int]] | None = None
if self._spawn_method == "trio":
# Receive runtime state from our parent
parent_data: dict[str, Any]
@ -897,10 +957,7 @@ class Actor:
# if "trace"/"util" mode is enabled?
f'{pformat(parent_data)}\n'
)
accept_addr = (
parent_data.pop('bind_host'),
parent_data.pop('bind_port'),
)
accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
rvs = parent_data.pop('_runtime_vars')
if rvs['_debug_mode']:
@ -918,18 +975,23 @@ class Actor:
_state._runtime_vars.update(rvs)
for attr, value in parent_data.items():
if attr == '_arb_addr':
if (
attr == 'reg_addrs'
and value
):
# XXX: ``msgspec`` doesn't support serializing tuples
# so just cash manually here since it's what our
# internals expect.
value = tuple(value) if value else None
self._arb_addr = value
# TODO: we don't really NEED these as
# tuples so we can probably drop this
# casting since apparently in python lists
# are "more efficient"?
self.reg_addrs = [tuple(val) for val in value]
else:
setattr(self, attr, value)
return chan, accept_addr
return chan, accept_addrs
except OSError: # failed to connect
log.warning(
@ -946,9 +1008,9 @@ class Actor:
handler_nursery: Nursery,
*,
# (host, port) to bind for channel server
accept_host: tuple[str, int] | None = None,
accept_port: int = 0,
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
listen_sockaddrs: list[tuple[str, int]] | None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Start the IPC transport server, begin listening for new connections.
@ -958,30 +1020,40 @@ class Actor:
`.cancel_server()` is called.
'''
if listen_sockaddrs is None:
listen_sockaddrs = [(None, 0)]
self._server_down = trio.Event()
try:
async with trio.open_nursery() as server_n:
listeners: list[trio.abc.Listener] = await server_n.start(
partial(
trio.serve_tcp,
self._stream_handler,
# new connections will stay alive even if this server
# is cancelled
handler_nursery=handler_nursery,
port=accept_port,
host=accept_host,
for host, port in listen_sockaddrs:
listeners: list[trio.abc.Listener] = await server_n.start(
partial(
trio.serve_tcp,
handler=self._stream_handler,
port=port,
host=host,
# NOTE: configured such that new
# connections will stay alive even if
# this server is cancelled!
handler_nursery=handler_nursery,
)
)
)
sockets: list[trio.socket] = [
getattr(listener, 'socket', 'unknown socket')
for listener in listeners
]
log.runtime(
'Started TCP server(s)\n'
f'|_{sockets}\n'
)
self._listeners.extend(listeners)
sockets: list[trio.socket] = [
getattr(listener, 'socket', 'unknown socket')
for listener in listeners
]
log.runtime(
'Started TCP server(s)\n'
f'|_{sockets}\n'
)
self._listeners.extend(listeners)
task_status.started(server_n)
finally:
# signal the server is down since nursery above terminated
self._server_down.set()
@ -1319,6 +1391,18 @@ class Actor:
self._server_n.cancel_scope.cancel()
@property
def accept_addrs(self) -> list[tuple[str, int]]:
'''
All addresses to which the transport-channel server binds
and listens for new connections.
'''
# throws OSError on failure
return [
listener.socket.getsockname()
for listener in self._listeners
] # type: ignore
@property
def accept_addr(self) -> tuple[str, int]:
'''
@ -1327,7 +1411,7 @@ class Actor:
'''
# throws OSError on failure
return self._listeners[0].socket.getsockname() # type: ignore
return self.accept_addrs[0]
def get_parent(self) -> Portal:
'''
@ -1344,6 +1428,7 @@ class Actor:
'''
return self._peers[uid]
# TODO: move to `Channel.handshake(uid)`
async def _do_handshake(
self,
chan: Channel
@ -1380,7 +1465,7 @@ class Actor:
async def async_main(
actor: Actor,
accept_addr: tuple[str, int] | None = None,
accept_addrs: tuple[str, int] | None = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
@ -1408,20 +1493,25 @@ async def async_main(
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
registered_with_arbiter = False
is_registered: bool = False
try:
# establish primary connection with immediate parent
actor._parent_chan = None
actor._parent_chan: Channel | None = None
if parent_addr is not None:
actor._parent_chan, accept_addr_rent = await actor._from_parent(
parent_addr)
(
actor._parent_chan,
set_accept_addr_says_rent,
) = await actor._from_parent(parent_addr)
# either it's passed in because we're not a child
# or because we're running in mp mode
if accept_addr_rent is not None:
accept_addr = accept_addr_rent
# either it's passed in because we're not a child or
# because we're running in mp mode
if (
set_accept_addr_says_rent
and set_accept_addr_says_rent is not None
):
accept_addrs = set_accept_addr_says_rent
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
@ -1461,34 +1551,72 @@ async def async_main(
# - subactor: the bind address is sent by our parent
# over our established channel
# - root actor: the ``accept_addr`` passed to this method
assert accept_addr
host, port = accept_addr
assert accept_addrs
actor._server_n = await service_nursery.start(
partial(
actor._serve_forever,
service_nursery,
accept_host=host,
accept_port=port
try:
actor._server_n = await service_nursery.start(
partial(
actor._serve_forever,
service_nursery,
listen_sockaddrs=accept_addrs,
)
)
)
accept_addr = actor.accept_addr
except OSError as oserr:
# NOTE: always allow runtime hackers to debug
# tranport address bind errors - normally it's
# something silly like the wrong socket-address
# passed via a config or CLI Bo
entered_debug: bool = await _debug._maybe_enter_pm(oserr)
if not entered_debug:
log.exception('Failed to init IPC channel server !?\n')
raise
accept_addrs: list[tuple[str, int]] = actor.accept_addrs
# NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since
# all sub-actors should be able to speak to
# their root actor over that channel.
if _state._runtime_vars['_is_root']:
_state._runtime_vars['_root_mailbox'] = accept_addr
for addr in accept_addrs:
host, _ = addr
# TODO: generic 'lo' detector predicate
if '127.0.0.1' in host:
_state._runtime_vars['_root_mailbox'] = addr
# Register with the arbiter if we're told its addr
log.runtime(f"Registering {actor} for role `{actor.name}`")
assert isinstance(actor._arb_addr, tuple)
log.runtime(
f'Registering `{actor.name}` ->\n'
f'{pformat(accept_addrs)}'
)
async with get_arbiter(*actor._arb_addr) as arb_portal:
await arb_portal.run_from_ns(
'self',
'register_actor',
uid=actor.uid,
sockaddr=accept_addr,
)
# TODO: ideally we don't fan out to all registrars
# if addresses point to the same actor..
# So we need a way to detect that? maybe iterate
# only on unique actor uids?
for addr in actor.reg_addrs:
try:
assert isinstance(addr, tuple)
assert addr[1] # non-zero after bind
except AssertionError:
await _debug.pause()
registered_with_arbiter = True
async with get_registry(*addr) as reg_portal:
for accept_addr in accept_addrs:
if not accept_addr[1]:
await _debug.pause()
assert accept_addr[1]
await reg_portal.run_from_ns(
'self',
'register_actor',
uid=actor.uid,
sockaddr=accept_addr,
)
is_registered: bool = True
# init steps complete
task_status.started()
@ -1521,18 +1649,20 @@ async def async_main(
log.runtime("Closing all actor lifetime contexts")
actor.lifetime_stack.close()
if not registered_with_arbiter:
if not is_registered:
# TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger
# once we have that all working with std streams locking?
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {actor._arb_addr}?")
f"@ {actor.reg_addrs[0]}?")
log.error(
"\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n"
"\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n"
"\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n"
"\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n"
"\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n"
"\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n"
"\tIf this is a sub-actor hopefully its parent will keep running "
"correctly presuming this error was safely ignored..\n\n"
"\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT: "
"https://github.com/goodboy/tractor/issues\n"
)
if actor._parent_chan:
@ -1572,27 +1702,33 @@ async def async_main(
# Unregister actor from the registry-sys / registrar.
if (
registered_with_arbiter
and not actor.is_arbiter
is_registered
and not actor.is_registrar
):
failed = False
assert isinstance(actor._arb_addr, tuple)
with trio.move_on_after(0.5) as cs:
cs.shield = True
try:
async with get_arbiter(*actor._arb_addr) as arb_portal:
await arb_portal.run_from_ns(
'self',
'unregister_actor',
uid=actor.uid
)
except OSError:
failed: bool = False
for addr in actor.reg_addrs:
assert isinstance(addr, tuple)
with trio.move_on_after(0.5) as cs:
cs.shield = True
try:
async with get_registry(
*addr,
) as reg_portal:
await reg_portal.run_from_ns(
'self',
'unregister_actor',
uid=actor.uid
)
except OSError:
failed = True
if cs.cancelled_caught:
failed = True
if cs.cancelled_caught:
failed = True
if failed:
log.warning(
f"Failed to unregister {actor.name} from arbiter")
if failed:
log.warning(
f'Failed to unregister {actor.name} from '
f'registar @ {addr}'
)
# Ensure all peers (actors connected to us as clients) are finished
if not actor._no_more_peers.is_set():
@ -1611,18 +1747,36 @@ async def async_main(
# TODO: rename to `Registry` and move to `._discovery`!
class Arbiter(Actor):
'''
A special actor who knows all the other actors and always has
access to a top level nursery.
A special registrar actor who can contact all other actors
within its immediate process tree and possibly keeps a registry
of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor"
and thus always has access to the top-most-level actor
(process) nursery.
The arbiter is by default the first actor spawned on each host
and is responsible for keeping track of all other actors for
coordination purposes. If a new main process is launched and an
arbiter is already running that arbiter will be used.
By default, the registrar is always initialized when and if no
other registrar socket addrs have been specified to runtime
init entry-points (such as `open_root_actor()` or
`open_nursery()`). Any time a new main process is launched (and
thus thus a new root actor created) and, no existing registrar
can be contacted at the provided `registry_addr`, then a new
one is always created; however, if one can be reached it is
used.
Normally a distributed app requires at least registrar per
logical host where for that given "host space" (aka localhost
IPC domain of addresses) it is responsible for making all other
host (local address) bound actors *discoverable* to external
actor trees running on remote hosts.
'''
is_arbiter = True
def __init__(self, *args, **kwargs) -> None:
def __init__(
self,
*args,
**kwargs,
) -> None:
self._registry: dict[
tuple[str, str],
@ -1664,7 +1818,10 @@ class Arbiter(Actor):
# unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return {'.'.join(key): val for key, val in self._registry.items()}
return {
'.'.join(key): val
for key, val in self._registry.items()
}
async def wait_for_actor(
self,
@ -1707,8 +1864,15 @@ class Arbiter(Actor):
sockaddr: tuple[str, int]
) -> None:
uid = name, _ = (str(uid[0]), str(uid[1]))
self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
uid = name, hash = (str(uid[0]), str(uid[1]))
addr = (host, port) = (
str(sockaddr[0]),
int(sockaddr[1]),
)
if port == 0:
await _debug.pause()
assert port # should never be 0-dynamic-os-alloc
self._registry[uid] = addr
# pop and signal all waiter events
events = self._waiters.pop(name, [])

View File

@ -220,6 +220,10 @@ async def hard_kill(
# whilst also hacking on it XD
# terminate_after: int = 99999,
# NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD
# terminate_after: int = 99999,
) -> None:
'''
Un-gracefully terminate an OS level `trio.Process` after timeout.
@ -365,7 +369,7 @@ async def new_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
@ -387,7 +391,7 @@ async def new_proc(
actor_nursery,
subactor,
errors,
bind_addr,
bind_addrs,
parent_addr,
_runtime_vars, # run time vars
infect_asyncio=infect_asyncio,
@ -402,7 +406,7 @@ async def trio_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@ -491,12 +495,11 @@ async def trio_proc(
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"enable_modules": subactor.enable_modules,
"_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1],
"_runtime_vars": _runtime_vars,
'_parent_main_data': subactor._parent_main_data,
'enable_modules': subactor.enable_modules,
'reg_addrs': subactor.reg_addrs,
'bind_addrs': bind_addrs,
'_runtime_vars': _runtime_vars,
})
# track subactor in current nursery
@ -602,7 +605,7 @@ async def mp_proc(
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@ -660,7 +663,7 @@ async def mp_proc(
target=_mp_main,
args=(
subactor,
bind_addr,
bind_addrs,
fs_info,
_spawn_method,
parent_addr,

View File

@ -22,10 +22,7 @@ from contextlib import asynccontextmanager as acm
from functools import partial
import inspect
from pprint import pformat
from typing import (
Optional,
TYPE_CHECKING,
)
from typing import TYPE_CHECKING
import typing
import warnings
@ -97,7 +94,7 @@ class ActorNursery:
tuple[
Actor,
trio.Process | mp.Process,
Optional[Portal],
Portal | None,
]
] = {}
# portals spawned with ``run_in_actor()`` are
@ -121,12 +118,12 @@ class ActorNursery:
self,
name: str,
*,
bind_addr: tuple[str, int] = _default_bind_addr,
bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
nursery: trio.Nursery | None = None,
debug_mode: Optional[bool] | None = None,
debug_mode: bool | None = None,
infect_asyncio: bool = False,
) -> Portal:
'''
@ -161,7 +158,9 @@ class ActorNursery:
# modules allowed to invoked funcs from
enable_modules=enable_modules,
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
# verbatim relay this actor's registrar addresses
registry_addrs=current_actor().reg_addrs,
)
parent_addr = self._actor.accept_addr
assert parent_addr
@ -178,7 +177,7 @@ class ActorNursery:
self,
subactor,
self.errors,
bind_addr,
bind_addrs,
parent_addr,
_rtv, # run time vars
infect_asyncio=infect_asyncio,
@ -191,8 +190,8 @@ class ActorNursery:
fn: typing.Callable,
*,
name: Optional[str] = None,
bind_addr: tuple[str, int] = _default_bind_addr,
name: str | None = None,
bind_addrs: tuple[str, int] = [_default_bind_addr],
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
@ -221,7 +220,7 @@ class ActorNursery:
enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or []
),
bind_addr=bind_addr,
bind_addrs=bind_addrs,
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._ria_nursery,