Merge pull request #427 from goodboy/subsys_reorg

Mv core mods to `runtime/`, `spawn/`, `discovery/` subpkgs
multicast_revertable_streams
Bd 2026-04-02 18:21:00 -04:00 committed by GitHub
commit 8f6bc56174
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
60 changed files with 893 additions and 444 deletions

View File

@ -20,7 +20,7 @@ async def sleep(
async def open_ctx(
n: tractor._supervise.ActorNursery
n: tractor.runtime._supervise.ActorNursery
):
# spawn both actors

View File

@ -10,7 +10,7 @@ async def main(service_name):
await an.start_actor(service_name)
async with tractor.get_registry() as portal:
print(f"Arbiter is listening on {portal.channel}")
print(f"Registrar is listening on {portal.channel}")
async with tractor.wait_for_actor(service_name) as sockaddr:
print(f"my_service is found at {sockaddr}")

View File

@ -9,6 +9,8 @@ import os
import signal
import platform
import time
from pathlib import Path
from typing import Literal
import pytest
import tractor
@ -52,6 +54,76 @@ no_macos = pytest.mark.skipif(
)
def get_cpu_state(
icpu: int = 0,
setting: Literal[
'scaling_governor',
'*_pstate_max_freq',
'scaling_max_freq',
# 'scaling_cur_freq',
] = '*_pstate_max_freq',
) -> tuple[
Path,
str|int,
]|None:
'''
Attempt to read the (first) CPU's setting according
to the set `setting` from under the file-sys,
/sys/devices/system/cpu/cpu0/cpufreq/{setting}
Useful to determine latency headroom for various perf affected
test suites.
'''
try:
# Read governor for core 0 (usually same for all)
setting_path: Path = list(
Path(f'/sys/devices/system/cpu/cpu{icpu}/cpufreq/')
.glob(f'{setting}')
)[0] # <- XXX must be single match!
with open(
setting_path,
'r',
) as f:
return (
setting_path,
f.read().strip(),
)
except (FileNotFoundError, IndexError):
return None
def cpu_scaling_factor() -> float:
'''
Return a latency-headroom multiplier (>= 1.0) reflecting how
much to inflate time-limits when CPU-freq scaling is active on
linux.
When no scaling info is available (non-linux, missing sysfs),
returns 1.0 (i.e. no headroom adjustment needed).
'''
if _non_linux:
return 1.
mx = get_cpu_state()
cur = get_cpu_state(setting='scaling_max_freq')
if mx is None or cur is None:
return 1.
_mx_pth, max_freq = mx
_cur_pth, cur_freq = cur
cpu_scaled: float = int(cur_freq) / int(max_freq)
if cpu_scaled != 1.:
return 1. / (
cpu_scaled * 2 # <- bc likely "dual threaded"
)
return 1.
def pytest_addoption(
parser: pytest.Parser,
):

View File

@ -126,7 +126,7 @@ def test_shield_pause(
child.pid,
signal.SIGINT,
)
from tractor._supervise import _shutdown_msg
from tractor.runtime._supervise import _shutdown_msg
expect(
child,
# 'Shutting down actor runtime',

View File

@ -8,17 +8,16 @@ from pathlib import Path
import pytest
import trio
import tractor
from tractor import (
Actor,
_state,
_addr,
)
from tractor import Actor
from tractor.runtime import _state
from tractor.discovery import _addr
@pytest.fixture
def bindspace_dir_str() -> str:
rt_dir: Path = tractor._state.get_rt_dir()
from tractor.runtime._state import get_rt_dir
rt_dir: Path = get_rt_dir()
bs_dir: Path = rt_dir / 'doggy'
bs_dir_str: str = str(bs_dir)
assert not bs_dir.is_dir()

View File

@ -13,9 +13,9 @@ from tractor import (
Portal,
ipc,
msg,
_state,
_addr,
)
from tractor.runtime import _state
from tractor.discovery import _addr
@tractor.context
async def chk_tpts(

View File

@ -61,7 +61,7 @@ async def maybe_expect_raises(
Async wrapper for ensuring errors propagate from the inner scope.
'''
if tractor._state.debug_mode():
if tractor.debug_mode():
timeout += 999
with trio.fail_after(timeout):

View File

@ -490,7 +490,7 @@ def test_cancel_via_SIGINT(
"""Ensure that a control-C (SIGINT) signal cancels both the parent and
child processes in trionic fashion
"""
pid = os.getpid()
pid: int = os.getpid()
async def main():
with trio.fail_after(2):
@ -517,6 +517,8 @@ def test_cancel_via_SIGINT_other_task(
started from a seperate ``trio`` child task.
'''
from .conftest import cpu_scaling_factor
pid: int = os.getpid()
timeout: float = (
4 if _non_linux
@ -525,6 +527,11 @@ def test_cancel_via_SIGINT_other_task(
if _friggin_windows: # smh
timeout += 1
# add latency headroom for CPU freq scaling (auto-cpufreq et al.)
headroom: float = cpu_scaling_factor()
if headroom != 1.:
timeout *= headroom
async def spawn_and_sleep_forever(
task_status=trio.TASK_STATUS_IGNORED
):

View File

@ -10,7 +10,20 @@ from tractor._testing import tractor_test
MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises() -> None:
def test_empty_mngrs_input_raises(
tpt_proto: str,
) -> None:
# TODO, the `open_actor_cluster()` teardown hangs
# intermittently on UDS when `gather_contexts(mngrs=())`
# raises `ValueError` mid-setup; likely a race in the
# actor-nursery cleanup vs UDS socket shutdown. Needs
# a deeper look at `._clustering`/`._supervise` teardown
# paths with the UDS transport.
if tpt_proto == 'uds':
pytest.skip(
'actor-cluster teardown hangs intermittently on UDS'
)
async def main():
with trio.fail_after(3):
async with (

View File

@ -26,7 +26,7 @@ from tractor._exceptions import (
StreamOverrun,
ContextCancelled,
)
from tractor._state import current_ipc_ctx
from tractor.runtime._state import current_ipc_ctx
from tractor._testing import (
tractor_test,
@ -939,7 +939,7 @@ def test_one_end_stream_not_opened(
'''
overrunner, buf_size_increase, entrypoint = overrun_by
from tractor._runtime import Actor
from tractor.runtime._runtime import Actor
buf_size = buf_size_increase + Actor.msg_buffer_size
timeout: float = (

View File

@ -24,7 +24,7 @@ async def test_reg_then_unreg(
reg_addr: tuple,
):
actor = tractor.current_actor()
assert actor.is_arbiter
assert actor.is_registrar
assert len(actor._registry) == 1 # only self is registered
async with tractor.open_nursery(
@ -35,7 +35,7 @@ async def test_reg_then_unreg(
uid = portal.channel.aid.uid
async with tractor.get_registry(reg_addr) as aportal:
# this local actor should be the arbiter
# this local actor should be the registrar
assert actor is aportal.actor
async with tractor.wait_for_actor('actor'):
@ -154,7 +154,7 @@ async def unpack_reg(
actor_or_portal: tractor.Portal|tractor.Actor,
):
'''
Get and unpack a "registry" RPC request from the "arbiter" registry
Get and unpack a "registry" RPC request from the registrar
system.
'''
@ -197,15 +197,15 @@ async def spawn_and_check_registry(
actor = tractor.current_actor()
if remote_arbiter:
assert not actor.is_arbiter
assert not actor.is_registrar
if actor.is_arbiter:
extra = 1 # arbiter is local root actor
if actor.is_registrar:
extra = 1 # registrar is local root actor
get_reg = partial(unpack_reg, actor)
else:
get_reg = partial(unpack_reg, portal)
extra = 2 # local root actor + remote arbiter
extra = 2 # local root actor + remote registrar
# ensure current actor is registered
registry: dict = await get_reg()
@ -285,7 +285,7 @@ def test_subactors_unregister_on_cancel(
):
'''
Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter.
deregistering themselves with the registrar.
'''
with pytest.raises(KeyboardInterrupt):
@ -314,7 +314,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
'''
Verify that cancelling a nursery results in all subactors
deregistering themselves with a **remote** (not in the local
process tree) arbiter.
process tree) registrar.
'''
with pytest.raises(KeyboardInterrupt):
@ -387,7 +387,7 @@ async def close_chans_before_nursery(
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# normal registrar channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
@ -420,7 +420,7 @@ def test_close_channel_explicit(
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
results in subactor(s) deregistering from the registrar.
'''
with pytest.raises(KeyboardInterrupt):
@ -444,7 +444,7 @@ def test_close_channel_explicit_remote_registrar(
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
results in subactor(s) deregistering from the registrar.
'''
with pytest.raises(KeyboardInterrupt):

View File

@ -94,8 +94,10 @@ def run_example_in_subproc(
for f in p[2]
if (
'__' not in f
and f[0] != '_'
'__' not in f # ignore any pkg-mods
# ignore any `__pycache__` subdir
and '__pycache__' not in str(p[0])
and f[0] != '_' # ignore any WIP "examplel mods"
and 'debugging' not in p[0]
and 'integration' not in p[0]
and 'advanced_faults' not in p[0]
@ -143,12 +145,19 @@ def test_example(
'This test does run just fine "in person" however..'
)
from .conftest import cpu_scaling_factor
timeout: float = (
60
if ci_env and _non_linux
else 16
)
# add latency headroom for CPU freq scaling (auto-cpufreq et al.)
headroom: float = cpu_scaling_factor()
if headroom != 1.:
timeout *= headroom
with open(ex_file, 'r') as ex:
code = ex.read()

View File

@ -26,8 +26,8 @@ from tractor import (
to_asyncio,
RemoteActorError,
ContextCancelled,
_state,
)
from tractor.runtime import _state
from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc

View File

@ -201,7 +201,7 @@ async def stream_from_peer(
) -> None:
# sanity
assert tractor._state.debug_mode() == debug_mode
assert tractor.debug_mode() == debug_mode
peer: Portal
try:
@ -841,7 +841,7 @@ async def serve_subactors(
async with open_nursery() as an:
# sanity
assert tractor._state.debug_mode() == debug_mode
assert tractor.debug_mode() == debug_mode
await ctx.started(peer_name)
async with ctx.open_stream() as ipc:
@ -880,7 +880,7 @@ async def client_req_subactor(
) -> None:
# sanity
if debug_mode:
assert tractor._state.debug_mode()
assert tractor.debug_mode()
# TODO: other cases to do with sub lifetimes:
# -[ ] test that we can have the server spawn a sub

View File

@ -300,19 +300,43 @@ def test_a_quadruple_example(
time_quad_ex: tuple[list[int], float],
ci_env: bool,
spawn_backend: str,
test_log: tractor.log.StackLevelAdapter,
):
'''
This also serves as a kind of "we'd like to be this fast test".
This also serves as a "we'd like to be this fast" smoke test
given past empirical eval of this suite.
'''
non_linux: bool = (_sys := platform.system()) != 'Linux'
results, diff = time_quad_ex
assert results
this_fast_on_linux: float = 3
this_fast = (
6 if non_linux
else 3
else this_fast_on_linux
)
# ^ XXX NOTE,
# i've noticed that tweaking the CPU governor setting
# to not "always" enable "turbo" mode can result in latency
# which causes this limit to be too little. Not sure if it'd
# be worth it to adjust the linux value based on reading the
# CPU conf from the sys?
#
# For ex, see the `auto-cpufreq` docs on such settings,
# https://github.com/AdnanHodzic/auto-cpufreq?tab=readme-ov-file#example-config-file-contents
#
# HENCE this below latency-headroom compensation logic..
from .conftest import cpu_scaling_factor
headroom: float = cpu_scaling_factor()
if headroom != 1.:
this_fast = this_fast_on_linux * headroom
test_log.warning(
f'Adding latency headroom on linux bc CPU scaling,\n'
f'headroom: {headroom}\n'
f'this_fast_on_linux: {this_fast_on_linux} -> {this_fast}\n'
)
results, diff = time_quad_ex
assert results
assert diff < this_fast
@ -353,7 +377,7 @@ def test_not_fast_enough_quad(
assert results is None
@tractor_test
@tractor_test(timeout=20)
async def test_respawn_consumer_task(
reg_addr: tuple,
spawn_backend: str,

View File

@ -1,5 +1,5 @@
"""
Arbiter and "local" actor api
Registrar and "local" actor api
"""
import time
@ -12,11 +12,11 @@ from tractor._testing import tractor_test
@pytest.mark.trio
async def test_no_runtime():
"""An arbitter must be established before any nurseries
"""A registrar must be established before any nurseries
can be created.
(In other words ``tractor.open_root_actor()`` must be engaged at
some point?)
(In other words ``tractor.open_root_actor()`` must be
engaged at some point?)
"""
with pytest.raises(RuntimeError) :
async with tractor.find_actor('doggy'):
@ -25,9 +25,9 @@ async def test_no_runtime():
@tractor_test
async def test_self_is_registered(reg_addr):
"Verify waiting on the arbiter to register itself using the standard api."
"Verify waiting on the registrar to register itself using the standard api."
actor = tractor.current_actor()
assert actor.is_arbiter
assert actor.is_registrar
with trio.fail_after(0.2):
async with tractor.wait_for_actor('root') as portal:
assert portal.channel.uid[0] == 'root'
@ -35,11 +35,11 @@ async def test_self_is_registered(reg_addr):
@tractor_test
async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal."
"Verify waiting on the registrar to register itself using a local portal."
actor = tractor.current_actor()
assert actor.is_arbiter
assert actor.is_registrar
async with tractor.get_registry(reg_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal)
assert isinstance(portal, tractor.runtime._portal.LocalPortal)
with trio.fail_after(0.2):
sockaddr = await portal.run_from_ns(
@ -57,8 +57,8 @@ def test_local_actor_async_func(reg_addr):
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter
# registrar is started in-proc if dne
assert tractor.current_actor().is_registrar
for i in range(10):
nums.append(i)

View File

@ -17,11 +17,11 @@ from tractor._testing import (
)
from tractor import (
current_actor,
_state,
Actor,
Context,
Portal,
)
from tractor.runtime import _state
from .conftest import (
sig_prog,
_INT_SIGNAL,
@ -30,7 +30,7 @@ from .conftest import (
if TYPE_CHECKING:
from tractor.msg import Aid
from tractor._addr import (
from tractor.discovery._addr import (
UnwrappedAddress,
)
@ -53,19 +53,19 @@ def test_abort_on_sigint(
@tractor_test
async def test_cancel_remote_arbiter(
async def test_cancel_remote_registrar(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
assert not current_actor().is_arbiter
assert not current_actor().is_registrar
async with tractor.get_registry(reg_addr) as portal:
await portal.cancel_actor()
time.sleep(0.1)
# the arbiter channel server is cancelled but not its main task
# the registrar channel server is cancelled but not its main task
assert daemon.returncode is None
# no arbiter socket should exist
# no registrar socket should exist
with pytest.raises(OSError):
async with tractor.get_registry(reg_addr) as portal:
pass
@ -80,7 +80,7 @@ def test_register_duplicate_name(
registry_addrs=[reg_addr],
) as an:
assert not current_actor().is_arbiter
assert not current_actor().is_registrar
p1 = await an.start_actor('doggy')
p2 = await an.start_actor('doggy')
@ -122,7 +122,7 @@ async def get_root_portal(
# connect back to our immediate parent which should also
# be the actor-tree's root.
from tractor._discovery import get_root
from tractor.discovery._discovery import get_root
ptl: Portal
async with get_root() as ptl:
root_aid: Aid = ptl.chan.aid

View File

@ -94,15 +94,15 @@ def test_runtime_vars_unset(
after the root actor-runtime exits!
'''
assert not tractor._state._runtime_vars['_debug_mode']
assert not tractor.runtime._state._runtime_vars['_debug_mode']
async def main():
assert not tractor._state._runtime_vars['_debug_mode']
assert not tractor.runtime._state._runtime_vars['_debug_mode']
async with tractor.open_nursery(
debug_mode=True,
):
assert tractor._state._runtime_vars['_debug_mode']
assert tractor.runtime._state._runtime_vars['_debug_mode']
# after runtime closure, should be reverted!
assert not tractor._state._runtime_vars['_debug_mode']
assert not tractor.runtime._state._runtime_vars['_debug_mode']
trio.run(main)

View File

@ -110,7 +110,7 @@ def test_rpc_errors(
) as n:
actor = tractor.current_actor()
assert actor.is_arbiter
assert actor.is_registrar
await n.run_in_actor(
sleep_back_actor,
actor_name=subactor_requests_to,

View File

@ -39,7 +39,7 @@ async def spawn(
):
# now runtime exists
actor: tractor.Actor = tractor.current_actor()
assert actor.is_arbiter == should_be_root
assert actor.is_registrar == should_be_root
# spawns subproc here
portal: tractor.Portal = await an.run_in_actor(
@ -68,7 +68,7 @@ async def spawn(
assert result == 10
return result
else:
assert actor.is_arbiter == should_be_root
assert actor.is_registrar == should_be_root
return 10
@ -181,7 +181,7 @@ def test_loglevel_propagated_to_subactor(
async def main():
async with tractor.open_nursery(
name='arbiter',
name='registrar',
start_method=start_method,
arbiter_addr=reg_addr,

View File

@ -30,21 +30,23 @@ from ._streaming import (
MsgStream as MsgStream,
stream as stream,
)
from ._discovery import (
from .discovery._discovery import (
get_registry as get_registry,
find_actor as find_actor,
wait_for_actor as wait_for_actor,
query_actor as query_actor,
)
from ._supervise import (
from .runtime._supervise import (
open_nursery as open_nursery,
ActorNursery as ActorNursery,
)
from ._state import (
from .runtime._state import (
RuntimeVars as RuntimeVars,
current_actor as current_actor,
is_root_process as is_root_process,
current_ipc_ctx as current_ipc_ctx,
debug_mode as debug_mode
debug_mode as debug_mode,
get_runtime_vars as get_runtime_vars,
is_root_process as is_root_process,
)
from ._exceptions import (
ContextCancelled as ContextCancelled,
@ -65,6 +67,10 @@ from ._root import (
open_root_actor as open_root_actor,
)
from .ipc import Channel as Channel
from ._portal import Portal as Portal
from ._runtime import Actor as Actor
from .runtime._portal import Portal as Portal
from .runtime._runtime import Actor as Actor
from .discovery._registry import (
Registrar as Registrar,
Arbiter as Arbiter,
)
# from . import hilevel as hilevel

View File

@ -22,8 +22,8 @@ import argparse
from ast import literal_eval
from ._runtime import Actor
from ._entry import _trio_main
from .runtime._runtime import Actor
from .spawn._entry import _trio_main
def parse_uid(arg):

View File

@ -97,7 +97,7 @@ from ._streaming import (
MsgStream,
open_stream_from_ctx,
)
from ._state import (
from .runtime._state import (
current_actor,
debug_mode,
_ctxvar_Context,
@ -107,8 +107,8 @@ from .trionics import (
)
# ------ - ------
if TYPE_CHECKING:
from ._portal import Portal
from ._runtime import Actor
from .runtime._portal import Portal
from .runtime._runtime import Actor
from .ipc._transport import MsgTransport
from .devx._frame_stack import (
CallerInfo,

View File

@ -43,7 +43,7 @@ from msgspec import (
ValidationError,
)
from tractor._state import current_actor
from tractor.runtime._state import current_actor
from tractor.log import get_logger
from tractor.msg import (
Error,
@ -187,6 +187,30 @@ _body_fields: list[str] = list(
)
def reg_err_types(
exc_types: list[Type[Exception]],
) -> None:
'''
Register custom exception types for local lookup.
Such that error types can be registered by an external
`tractor`-use-app code base which are expected to be raised
remotely; enables them being re-raised on the recevier side of
some inter-actor IPC dialog.
'''
for exc_type in exc_types:
log.debug(
f'Register custom exception,\n'
f'{exc_type!r}\n'
)
setattr(
_this_mod,
exc_type.__name__,
exc_type,
)
def get_err_type(type_name: str) -> BaseException|None:
'''
Look up an exception type by name from the set of locally known

View File

@ -37,19 +37,20 @@ import warnings
import trio
from . import _runtime
from .runtime import _runtime
from .discovery._registry import Registrar
from .devx import (
debug,
_frame_stack,
pformat as _pformat,
)
from . import _spawn
from . import _state
from .spawn import _spawn
from .runtime import _state
from . import log
from .ipc import (
_connect_chan,
)
from ._addr import (
from .discovery._addr import (
Address,
UnwrappedAddress,
default_lo_addrs,
@ -267,7 +268,6 @@ async def open_root_actor(
if start_method is not None:
_spawn.try_set_start_method(start_method)
# TODO! remove this ASAP!
if arbiter_addr is not None:
warnings.warn(
'`arbiter_addr` is now deprecated\n'
@ -400,7 +400,7 @@ async def open_root_actor(
'registry socket(s) already bound'
)
# we were able to connect to an arbiter
# we were able to connect to a registrar
logger.info(
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
)
@ -453,8 +453,7 @@ async def open_root_actor(
# https://github.com/goodboy/tractor/pull/348
# https://github.com/goodboy/tractor/issues/296
# TODO: rename as `RootActor` or is that even necessary?
actor = _runtime.Arbiter(
actor = Registrar(
name=name or 'registrar',
uuid=mk_uuid(),
registry_addrs=registry_addrs,

View File

@ -55,7 +55,7 @@ from tractor.msg import (
)
if TYPE_CHECKING:
from ._runtime import Actor
from .runtime._runtime import Actor
from ._context import Context
from .ipc import Channel

View File

@ -26,9 +26,7 @@ import random
from typing import (
Type,
)
from tractor import (
_addr,
)
from tractor.discovery import _addr
def get_rando_addr(

View File

@ -21,17 +21,27 @@ and applications.
'''
from functools import (
partial,
wraps,
)
import inspect
import platform
from typing import (
Callable,
Type,
)
import pytest
import tractor
import trio
import wrapt
def tractor_test(fn):
def tractor_test(
wrapped: Callable|None = None,
*,
# @tractor_test(<deco-params>)
timeout:float = 30,
hide_tb: bool = True,
):
'''
Decorator for async test fns to decorator-wrap them as "native"
looking sync funcs runnable by `pytest` and auto invoked with
@ -45,8 +55,18 @@ def tractor_test(fn):
Basic deco use:
---------------
@tractor_test
async def test_whatever():
@tractor_test(
timeout=10,
)
async def test_whatever(
# fixture param declarations
loglevel: str,
start_method: str,
reg_addr: tuple,
tpt_proto: str,
debug_mode: bool,
):
# already inside a root-actor runtime `trio.Task`
await ...
@ -55,7 +75,7 @@ def tractor_test(fn):
If any of the following fixture are requested by the wrapped test
fn (via normal func-args declaration),
- `reg_addr` (a socket addr tuple where arbiter is listening)
- `reg_addr` (a socket addr tuple where registrar is listening)
- `loglevel` (logging level passed to tractor internals)
- `start_method` (subprocess spawning backend)
@ -67,52 +87,69 @@ def tractor_test(fn):
`tractor.open_root_actor()` funcargs.
'''
@wraps(fn)
__tracebackhide__: bool = hide_tb
# handle the decorator not called with () case.
# i.e. in `wrapt` support a deco-with-optional-args,
# https://wrapt.readthedocs.io/en/master/decorators.html#decorators-with-optional-arguments
if wrapped is None:
return wrapt.PartialCallableObjectProxy(
tractor_test,
timeout=timeout,
hide_tb=hide_tb
)
@wrapt.decorator
def wrapper(
wrapped: Callable,
instance: object|Type|None,
args: tuple,
kwargs: dict,
):
__tracebackhide__: bool = hide_tb
# NOTE, ensure we inject any test-fn declared fixture names.
for kw in [
'reg_addr',
'loglevel',
'start_method',
'debug_mode',
'tpt_proto',
'timeout',
]:
if kw in inspect.signature(wrapped).parameters:
assert kw in kwargs
start_method = kwargs.get('start_method')
if platform.system() == "Windows":
if start_method is None:
kwargs['start_method'] = 'trio'
elif start_method != 'trio':
raise ValueError(
'ONLY the `start_method="trio"` is supported on Windows.'
)
# open a root-actor, passing certain
# `tractor`-runtime-settings, then invoke the test-fn body as
# the root-most task.
#
# https://wrapt.readthedocs.io/en/master/decorators.html#processing-function-arguments
async def _main(
*args,
loglevel=None,
reg_addr=None,
# runtime-settings
loglevel:str|None = None,
reg_addr:tuple|None = None,
start_method: str|None = None,
debug_mode: bool = False,
tpt_proto: str|None = None,
**kwargs
**kwargs,
):
# __tracebackhide__ = True
__tracebackhide__: bool = hide_tb
# NOTE: inject ant test func declared fixture
# names by manually checking!
if 'reg_addr' in inspect.signature(fn).parameters:
# injects test suite fixture value to test as well
# as `run()`
kwargs['reg_addr'] = reg_addr
if 'loglevel' in inspect.signature(fn).parameters:
# allows test suites to define a 'loglevel' fixture
# that activates the internal logging
kwargs['loglevel'] = loglevel
if start_method is None:
if platform.system() == "Windows":
start_method = 'trio'
if 'start_method' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['start_method'] = start_method
if 'debug_mode' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['debug_mode'] = debug_mode
if 'tpt_proto' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['tpt_proto'] = tpt_proto
if kwargs:
# use explicit root actor start
async def _main():
with trio.fail_after(timeout):
async with tractor.open_root_actor(
# **kwargs,
registry_addrs=[reg_addr] if reg_addr else None,
loglevel=loglevel,
start_method=start_method,
@ -121,17 +158,31 @@ def tractor_test(fn):
debug_mode=debug_mode,
):
await fn(*args, **kwargs)
# invoke test-fn body IN THIS task
await wrapped(
*args,
**kwargs,
)
main = _main
funcname = wrapped.__name__
if not inspect.iscoroutinefunction(wrapped):
raise TypeError(
f"Test-fn {funcname!r} must be an async-function !!"
)
else:
# use implicit root actor start
main = partial(fn, *args, **kwargs)
# invoke runtime via a root task.
return trio.run(
partial(
_main,
*args,
**kwargs,
)
)
return trio.run(main)
return wrapper
return wrapper(
wrapped,
)
def pytest_addoption(
@ -179,7 +230,8 @@ def pytest_addoption(
def pytest_configure(config):
backend = config.option.spawn_backend
tractor._spawn.try_set_start_method(backend)
from tractor.spawn._spawn import try_set_start_method
try_set_start_method(backend)
# register custom marks to avoid warnings see,
# https://docs.pytest.org/en/stable/how-to/writing_plugins.html#registering-custom-markers
@ -225,7 +277,8 @@ def tpt_protos(request) -> list[str]:
# XXX ensure we support the protocol by name via lookup!
for proto_key in proto_keys:
addr_type = tractor._addr._address_types[proto_key]
from tractor.discovery import _addr
addr_type = _addr._address_types[proto_key]
assert addr_type.proto_key == proto_key
yield proto_keys
@ -256,7 +309,7 @@ def tpt_proto(
# f'tpt-proto={proto_key!r}\n'
# )
from tractor import _state
from tractor.runtime import _state
if _state._def_tpt_proto != proto_key:
_state._def_tpt_proto = proto_key
_state._runtime_vars['_enable_tpts'] = [

View File

@ -45,17 +45,15 @@ from typing import (
)
import trio
from tractor import (
_state,
log as logmod,
)
from tractor.runtime import _state
from tractor import log as logmod
from tractor.devx import debug
log = logmod.get_logger()
if TYPE_CHECKING:
from tractor._spawn import ProcessType
from tractor.spawn._spawn import ProcessType
from tractor import (
Actor,
ActorNursery,

View File

@ -53,8 +53,8 @@ import trio
from tractor._exceptions import (
NoRuntime,
)
from tractor import _state
from tractor._state import (
from tractor.runtime import _state
from tractor.runtime._state import (
current_actor,
debug_mode,
)
@ -76,7 +76,7 @@ from ._repl import (
if TYPE_CHECKING:
from trio.lowlevel import Task
from tractor._runtime import (
from tractor.runtime._runtime import (
Actor,
)

View File

@ -25,7 +25,7 @@ from functools import (
import os
import pdbp
from tractor._state import (
from tractor.runtime._state import (
is_root_process,
)

View File

@ -27,7 +27,7 @@ from typing import (
)
import trio
from tractor.log import get_logger
from tractor._state import (
from tractor.runtime._state import (
current_actor,
is_root_process,
)
@ -44,7 +44,7 @@ if TYPE_CHECKING:
from tractor.ipc import (
Channel,
)
from tractor._runtime import (
from tractor.runtime._runtime import (
Actor,
)

View File

@ -40,7 +40,7 @@ from trio.lowlevel import (
Task,
)
from tractor._context import Context
from tractor._state import (
from tractor.runtime._state import (
current_actor,
debug_mode,
is_root_process,

View File

@ -55,12 +55,12 @@ import tractor
from tractor.log import get_logger
from tractor.to_asyncio import run_trio_task_in_future
from tractor._context import Context
from tractor import _state
from tractor.runtime import _state
from tractor._exceptions import (
NoRuntime,
InternalError,
)
from tractor._state import (
from tractor.runtime._state import (
current_actor,
current_ipc_ctx,
is_root_process,
@ -87,7 +87,7 @@ from ..pformat import (
if TYPE_CHECKING:
from trio.lowlevel import Task
from threading import Thread
from tractor._runtime import (
from tractor.runtime._runtime import (
Actor,
)
# from ._post_mortem import BoxedMaybeException

View File

@ -55,12 +55,12 @@ import tractor
from tractor.to_asyncio import run_trio_task_in_future
from tractor.log import get_logger
from tractor._context import Context
from tractor import _state
from tractor.runtime import _state
from tractor._exceptions import (
DebugRequestError,
InternalError,
)
from tractor._state import (
from tractor.runtime._state import (
current_actor,
is_root_process,
)
@ -71,7 +71,7 @@ if TYPE_CHECKING:
from tractor.ipc import (
IPCServer,
)
from tractor._runtime import (
from tractor.runtime._runtime import (
Actor,
)
from ._repl import (
@ -1013,7 +1013,7 @@ async def request_root_stdio_lock(
DebugStatus.req_task = current_task()
req_err: BaseException|None = None
try:
from tractor._discovery import get_root
from tractor.discovery._discovery import get_root
# NOTE: we need this to ensure that this task exits
# BEFORE the REPl instance raises an error like
# `bdb.BdbQuit` directly, OW you get a trio cs stack

View File

@ -0,0 +1,26 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Discovery (protocols) API for automatic addressing
and location management of (service) actors.
NOTE: to avoid circular imports, this ``__init__``
does NOT eagerly import submodules. Use direct
module paths like ``tractor.discovery._addr`` or
``tractor.discovery._discovery`` instead.
'''

View File

@ -27,15 +27,15 @@ from trio import (
SocketListener,
)
from .log import get_logger
from ._state import (
from ..log import get_logger
from ..runtime._state import (
_def_tpt_proto,
)
from .ipc._tcp import TCPAddress
from .ipc._uds import UDSAddress
from ..ipc._tcp import TCPAddress
from ..ipc._uds import UDSAddress
if TYPE_CHECKING:
from ._runtime import Actor
from ..runtime._runtime import Actor
log = get_logger()

View File

@ -28,29 +28,29 @@ from typing import (
from contextlib import asynccontextmanager as acm
from tractor.log import get_logger
from .trionics import (
from ..trionics import (
gather_contexts,
collapse_eg,
)
from .ipc import _connect_chan, Channel
from ..ipc import _connect_chan, Channel
from ._addr import (
UnwrappedAddress,
Address,
wrap_address
)
from ._portal import (
from ..runtime._portal import (
Portal,
open_portal,
LocalPortal,
)
from ._state import (
from ..runtime._state import (
current_actor,
_runtime_vars,
_def_tpt_proto,
)
if TYPE_CHECKING:
from ._runtime import Actor
from ..runtime._runtime import Actor
log = get_logger()
@ -72,8 +72,8 @@ async def get_registry(
'''
actor: Actor = current_actor()
if actor.is_registrar:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
# we're already the registrar
# (likely a re-entrant call from the registrar actor)
yield LocalPortal(
actor,
Channel(transport=None)
@ -268,10 +268,10 @@ async def find_actor(
None,
]:
'''
Ask the arbiter to find actor(s) by name.
Ask the registrar to find actor(s) by name.
Returns a connected portal to the last registered matching actor
known to the arbiter.
Returns a connected portal to the last registered
matching actor known to the registrar.
'''
# optimization path, use any pre-existing peer channel

View File

@ -0,0 +1,253 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either
# version 3 of the License, or (at your option) any later
# version.
# This program is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
# PURPOSE. See the GNU Affero General Public License for more
# details.
# You should have received a copy of the GNU Affero General
# Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
'''
Actor-registry for process-tree service discovery.
The `Registrar` is a special `Actor` subtype that serves as
the process-tree's name-registry, tracking actor
name-to-address mappings so peers can discover each other.
'''
from __future__ import annotations
from bidict import bidict
import trio
from ..runtime._runtime import Actor
from ._addr import (
UnwrappedAddress,
Address,
wrap_address,
)
from ..devx import debug
from ..log import get_logger
log = get_logger('tractor')
class Registrar(Actor):
'''
A special registrar `Actor` who can contact all other
actors within its immediate process tree and keeps
a registry of others meant to be discoverable in
a distributed application.
Normally the registrar is also the "root actor" and
thus always has access to the top-most-level actor
(process) nursery.
By default, the registrar is always initialized when
and if no other registrar socket addrs have been
specified to runtime init entry-points (such as
`open_root_actor()` or `open_nursery()`). Any time
a new main process is launched (and thus a new root
actor created) and, no existing registrar can be
contacted at the provided `registry_addr`, then
a new one is always created; however, if one can be
reached it is used.
Normally a distributed app requires at least one
registrar per logical host where for that given
"host space" (aka localhost IPC domain of addresses)
it is responsible for making all other host (local
address) bound actors *discoverable* to external
actor trees running on remote hosts.
'''
is_registrar = True
def is_registry(self) -> bool:
return self.is_registrar
def __init__(
self,
*args,
**kwargs,
) -> None:
self._registry: bidict[
tuple[str, str],
UnwrappedAddress,
] = bidict({})
self._waiters: dict[
str,
# either an event to sync to receiving an
# actor uid (which is filled in once the actor
# has sucessfully registered), or that uid
# after registry is complete.
list[trio.Event|tuple[str, str]]
] = {}
super().__init__(*args, **kwargs)
async def find_actor(
self,
name: str,
) -> UnwrappedAddress|None:
for uid, addr in self._registry.items():
if name in uid:
return addr
return None
async def get_registry(
self
) -> dict[str, UnwrappedAddress]:
'''
Return current name registry.
This method is async to allow for cross-actor
invocation.
'''
# NOTE: requires ``strict_map_key=False`` to the
# msgpack unpacker since we have tuples as keys
# (note this makes the registrar suscetible to
# hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return {
'.'.join(key): val
for key, val in self._registry.items()
}
async def wait_for_actor(
self,
name: str,
) -> list[UnwrappedAddress]:
'''
Wait for a particular actor to register.
This is a blocking call if no actor by the
provided name is currently registered.
'''
addrs: list[UnwrappedAddress] = []
addr: UnwrappedAddress
mailbox_info: str = (
'Actor registry contact infos:\n'
)
for uid, addr in self._registry.items():
mailbox_info += (
f'|_uid: {uid}\n'
f'|_addr: {addr}\n\n'
)
if name == uid[0]:
addrs.append(addr)
if not addrs:
waiter = trio.Event()
self._waiters.setdefault(
name, []
).append(waiter)
await waiter.wait()
for uid in self._waiters[name]:
if not isinstance(uid, trio.Event):
addrs.append(
self._registry[uid]
)
log.runtime(mailbox_info)
return addrs
async def register_actor(
self,
uid: tuple[str, str],
addr: UnwrappedAddress
) -> None:
uid = name, hash = (
str(uid[0]),
str(uid[1]),
)
waddr: Address = wrap_address(addr)
if not waddr.is_valid:
# should never be 0-dynamic-os-alloc
await debug.pause()
# XXX NOTE, value must also be hashable AND since
# `._registry` is a `bidict` values must be unique;
# use `.forceput()` to replace any prior (stale)
# entries that might map a different uid to the same
# addr (e.g. after an unclean shutdown or
# actor-restart reusing the same address).
self._registry.forceput(uid, tuple(addr))
# pop and signal all waiter events
events = self._waiters.pop(name, [])
self._waiters.setdefault(
name, []
).append(uid)
for event in events:
if isinstance(event, trio.Event):
event.set()
async def unregister_actor(
self,
uid: tuple[str, str]
) -> None:
uid = (str(uid[0]), str(uid[1]))
entry: tuple = self._registry.pop(
uid, None
)
if entry is None:
log.warning(
f'Request to de-register'
f' {uid!r} failed?'
)
async def delete_addr(
self,
addr: tuple[str, int|str]|list[str|int],
) -> tuple[str, str]|None:
# NOTE: `addr` arrives as a `list` over IPC
# (msgpack deserializes tuples -> lists) so
# coerce to `tuple` for the bidict hash lookup.
uid: tuple[str, str]|None = (
self._registry.inverse.pop(
tuple(addr),
None,
)
)
if uid:
report: str = (
'Deleting registry-entry for,\n'
)
else:
report: str = (
'No registry entry for,\n'
)
log.warning(
report
+
f'{addr!r}@{uid!r}'
)
return uid
# Backward compat alias
Arbiter = Registrar

View File

@ -244,8 +244,12 @@ def pub(
task2lock[name] = trio.StrictFIFOLock()
@wrapt.decorator
async def wrapper(agen, instance, args, kwargs):
async def wrapper(
agen,
instance,
args,
kwargs,
):
# XXX: this is used to extract arguments properly as per the
# `wrapt` docs
async def _execute(

View File

@ -39,7 +39,7 @@ from ._types import (
transport_from_addr,
transport_from_stream,
)
from tractor._addr import (
from tractor.discovery._addr import (
is_wrapped_addr,
wrap_address,
Address,

View File

@ -50,26 +50,24 @@ from ..devx.pformat import (
from .._exceptions import (
TransportClosed,
)
from .. import _rpc
from ..runtime import _rpc
from ..msg import (
MsgType,
Struct,
types as msgtypes,
)
from ..trionics import maybe_open_nursery
from .. import (
_state,
log,
)
from .._addr import Address
from ..runtime import _state
from .. import log
from ..discovery._addr import Address
from ._chan import Channel
from ._transport import MsgTransport
from ._uds import UDSAddress
from ._tcp import TCPAddress
if TYPE_CHECKING:
from .._runtime import Actor
from .._supervise import ActorNursery
from ..runtime._runtime import Actor
from ..runtime._supervise import ActorNursery
log = log.get_logger()
@ -357,7 +355,7 @@ async def handle_stream_from_peer(
# and `MsgpackStream._inter_packets()` on a read from the
# stream particularly when the runtime is first starting up
# inside `open_root_actor()` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# a bound listener on the registrar addr. the reset will be
# because the handshake was never meant took place.
log.runtime(
con_status
@ -970,7 +968,7 @@ class Server(Struct):
in `accept_addrs`.
'''
from .._addr import (
from ..discovery._addr import (
default_lo_addrs,
wrap_address,
)

View File

@ -54,7 +54,7 @@ from tractor.msg import (
)
if TYPE_CHECKING:
from tractor._addr import Address
from tractor.discovery._addr import Address
log = get_logger()
@ -225,7 +225,7 @@ class MsgpackTransport(MsgTransport):
# not sure entirely why we need this but without it we
# seem to be getting racy failures here on
# arbiter/registry name subs..
# registrar name subs..
trio.BrokenResourceError,
) as trans_err:

View File

@ -53,14 +53,14 @@ from tractor.log import get_logger
from tractor.ipc._transport import (
MsgpackTransport,
)
from tractor._state import (
from tractor.runtime._state import (
get_rt_dir,
current_actor,
is_root_process,
)
if TYPE_CHECKING:
from ._runtime import Actor
from tractor.runtime._runtime import Actor
# Platform-specific credential passing constants

View File

@ -47,7 +47,7 @@ import colorlog # type: ignore
# import colored_traceback.auto # ?TODO, need better config?
import trio
from ._state import current_actor
from .runtime._state import current_actor
_default_loglevel: str = 'ERROR'

View File

@ -50,7 +50,7 @@ from tractor._exceptions import (
_mk_recv_mte,
pack_error,
)
from tractor._state import (
from tractor.runtime._state import (
current_ipc_ctx,
)
from ._codec import (

View File

@ -0,0 +1,26 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
The actor runtime: core machinery for the
actor-model implemented on a `trio` task runtime.
NOTE: to avoid circular imports, this ``__init__``
does NOT eagerly import submodules. Use direct
module paths like ``tractor.runtime._state`` or
``tractor.runtime._runtime`` instead.
'''

View File

@ -39,30 +39,30 @@ import warnings
import trio
from .trionics import (
from ..trionics import (
maybe_open_nursery,
collapse_eg,
)
from ._state import (
current_actor,
)
from .ipc import Channel
from .log import get_logger
from .msg import (
from ..ipc import Channel
from ..log import get_logger
from ..msg import (
# Error,
PayloadMsg,
NamespacePath,
Return,
)
from ._exceptions import (
from .._exceptions import (
NoResult,
TransportClosed,
)
from ._context import (
from .._context import (
Context,
open_context_from_portal,
)
from ._streaming import (
from .._streaming import (
MsgStream,
)

View File

@ -43,11 +43,11 @@ from trio import (
TaskStatus,
)
from .ipc import Channel
from ._context import (
from ..ipc import Channel
from .._context import (
Context,
)
from ._exceptions import (
from .._exceptions import (
ContextCancelled,
RemoteActorError,
ModuleNotExposed,
@ -56,19 +56,19 @@ from ._exceptions import (
pack_error,
unpack_error,
)
from .trionics import (
from ..trionics import (
collapse_eg,
is_multi_cancelled,
maybe_raise_from_masking_exc,
)
from .devx import (
from ..devx import (
debug,
add_div,
pformat as _pformat,
)
from . import _state
from .log import get_logger
from .msg import (
from ..log import get_logger
from ..msg import (
current_codec,
MsgCodec,
PayloadT,

View File

@ -68,7 +68,6 @@ import textwrap
from types import ModuleType
import warnings
from bidict import bidict
import trio
from trio._core import _run as trio_runtime
from trio import (
@ -84,46 +83,46 @@ from tractor.msg import (
pretty_struct,
types as msgtypes,
)
from .trionics import (
from ..trionics import (
collapse_eg,
maybe_open_nursery,
)
from .ipc import (
from ..ipc import (
Channel,
# IPCServer, # causes cycles atm..
_server,
)
from ._addr import (
from ..discovery._addr import (
UnwrappedAddress,
Address,
# default_lo_addrs,
get_address_cls,
wrap_address,
)
from ._context import (
from .._context import (
mk_context,
Context,
)
from .log import get_logger
from ._exceptions import (
from ..log import get_logger
from .._exceptions import (
ContextCancelled,
InternalError,
ModuleNotExposed,
MsgTypeError,
unpack_error,
)
from .devx import (
from ..devx import (
debug,
pformat as _pformat
)
from ._discovery import get_registry
from ..discovery._discovery import get_registry
from ._portal import Portal
from . import _state
from . import _mp_fixup_main
from ..spawn import _mp_fixup_main
from . import _rpc
if TYPE_CHECKING:
from ._supervise import ActorNursery
from ._supervise import ActorNursery # noqa
from trio._channel import MemoryChannelState
@ -176,13 +175,21 @@ class Actor:
dialog.
'''
# ugh, we need to get rid of this and replace with a "registry" sys
# https://github.com/goodboy/tractor/issues/216
is_arbiter: bool = False
is_registrar: bool = False
@property
def is_registrar(self) -> bool:
return self.is_arbiter
def is_arbiter(self) -> bool:
'''
Deprecated, use `.is_registrar`.
'''
warnings.warn(
'`Actor.is_arbiter` is deprecated.\n'
'Use `.is_registrar` instead.',
DeprecationWarning,
stacklevel=2,
)
return self.is_registrar
@property
def is_root(self) -> bool:
@ -238,7 +245,6 @@ class Actor:
registry_addrs: list[Address]|None = None,
spawn_method: str|None = None,
# TODO: remove!
arbiter_addr: UnwrappedAddress|None = None,
) -> None:
@ -288,8 +294,8 @@ class Actor:
]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
# by the user (currently called the "arbiter")
# will be None for the parent most process started
# manually by the user (the "registrar")
self._spawn_method: str = spawn_method
# RPC state
@ -908,7 +914,7 @@ class Actor:
# TODO! -[ ] another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars
if rvs['_debug_mode']:
from .devx import (
from ..devx import (
enable_stack_on_sig,
maybe_init_greenback,
)
@ -1657,7 +1663,7 @@ async def async_main(
# TODO, just read direct from ipc_server?
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
# Register with the arbiter if we're told its addr
# Register with the registrar if we're told its addr
log.runtime(
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
# ^-TODO-^ we should instead show the maddr here^^
@ -1881,184 +1887,8 @@ async def async_main(
log.runtime(teardown_report)
# TODO: rename to `Registry` and move to `.discovery._registry`!
class Arbiter(Actor):
'''
A special registrar (and for now..) `Actor` who can contact all
other actors within its immediate process tree and possibly keeps
a registry of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor" and
thus always has access to the top-most-level actor (process)
nursery.
By default, the registrar is always initialized when and if no
other registrar socket addrs have been specified to runtime
init entry-points (such as `open_root_actor()` or
`open_nursery()`). Any time a new main process is launched (and
thus thus a new root actor created) and, no existing registrar
can be contacted at the provided `registry_addr`, then a new
one is always created; however, if one can be reached it is
used.
Normally a distributed app requires at least registrar per
logical host where for that given "host space" (aka localhost
IPC domain of addresses) it is responsible for making all other
host (local address) bound actors *discoverable* to external
actor trees running on remote hosts.
'''
is_arbiter = True
# TODO, implement this as a read on there existing a `._state` of
# some sort setup by whenever we impl this all as
# a `.discovery._registry.open_registry()` API
def is_registry(self) -> bool:
return self.is_arbiter
def __init__(
self,
*args,
**kwargs,
) -> None:
self._registry: bidict[
tuple[str, str],
UnwrappedAddress,
] = bidict({})
self._waiters: dict[
str,
# either an event to sync to receiving an actor uid (which
# is filled in once the actor has sucessfully registered),
# or that uid after registry is complete.
list[trio.Event | tuple[str, str]]
] = {}
super().__init__(*args, **kwargs)
async def find_actor(
self,
name: str,
) -> UnwrappedAddress|None:
for uid, addr in self._registry.items():
if name in uid:
return addr
return None
async def get_registry(
self
) -> dict[str, UnwrappedAddress]:
'''
Return current name registry.
This method is async to allow for cross-actor invocation.
'''
# NOTE: requires ``strict_map_key=False`` to the msgpack
# unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return {
'.'.join(key): val
for key, val in self._registry.items()
}
async def wait_for_actor(
self,
name: str,
) -> list[UnwrappedAddress]:
'''
Wait for a particular actor to register.
This is a blocking call if no actor by the provided name is currently
registered.
'''
addrs: list[UnwrappedAddress] = []
addr: UnwrappedAddress
mailbox_info: str = 'Actor registry contact infos:\n'
for uid, addr in self._registry.items():
mailbox_info += (
f'|_uid: {uid}\n'
f'|_addr: {addr}\n\n'
# Backward compat: class moved to discovery._registry
from ..discovery._registry import (
Registrar as Registrar,
)
if name == uid[0]:
addrs.append(addr)
if not addrs:
waiter = trio.Event()
self._waiters.setdefault(name, []).append(waiter)
await waiter.wait()
for uid in self._waiters[name]:
if not isinstance(uid, trio.Event):
addrs.append(self._registry[uid])
log.runtime(mailbox_info)
return addrs
async def register_actor(
self,
uid: tuple[str, str],
addr: UnwrappedAddress
) -> None:
uid = name, hash = (str(uid[0]), str(uid[1]))
waddr: Address = wrap_address(addr)
if not waddr.is_valid:
# should never be 0-dynamic-os-alloc
await debug.pause()
# XXX NOTE, value must also be hashable AND since
# `._registry` is a `bidict` values must be unique; use
# `.forceput()` to replace any prior (stale) entries
# that might map a different uid to the same addr (e.g.
# after an unclean shutdown or actor-restart reusing
# the same address).
self._registry.forceput(uid, tuple(addr))
# pop and signal all waiter events
events = self._waiters.pop(name, [])
self._waiters.setdefault(name, []).append(uid)
for event in events:
if isinstance(event, trio.Event):
event.set()
async def unregister_actor(
self,
uid: tuple[str, str]
) -> None:
uid = (str(uid[0]), str(uid[1]))
entry: tuple = self._registry.pop(uid, None)
if entry is None:
log.warning(
f'Request to de-register {uid!r} failed?'
)
async def delete_addr(
self,
addr: tuple[str, int|str]|list[str|int],
) -> tuple[str, str]|None:
# NOTE: `addr` arrives as a `list` over IPC
# (msgpack deserializes tuples -> lists) so
# coerce to `tuple` for the bidict hash lookup.
uid: tuple[str, str]|None = self._registry.inverse.pop(
tuple(addr),
None,
)
if uid:
report: str = 'Deleting registry-entry for,\n'
else:
report: str = 'No registry entry for,\n'
log.warning(
report
+
f'{addr!r}@{uid!r}'
)
return uid
Arbiter = Registrar

View File

@ -25,6 +25,7 @@ from contextvars import (
from pathlib import Path
from typing import (
Any,
Callable,
Literal,
TYPE_CHECKING,
)
@ -32,9 +33,14 @@ from typing import (
import platformdirs
from trio.lowlevel import current_task
from msgspec import (
field,
Struct,
)
if TYPE_CHECKING:
from ._runtime import Actor
from ._context import Context
from .._context import Context
# default IPC transport protocol settings
@ -47,9 +53,70 @@ _def_tpt_proto: TransportProtocolKey = 'tcp'
_current_actor: Actor|None = None # type: ignore # noqa
_last_actor_terminated: Actor|None = None
# TODO: mk this a `msgspec.Struct`!
# -[ ] type out all fields obvi!
# -[x] type out all fields obvi!
# -[ ] (eventually) mk wire-ready for monitoring?
class RuntimeVars(Struct):
'''
Actor-(and thus process)-global runtime state.
This struct is relayed from parent to child during sub-actor
spawning and is a singleton instance per process.
Generally contains,
- root-actor indicator.
- comms-info: addrs for both (public) process/service-discovery
and in-tree contact with other actors.
- transport-layer IPC protocol server(s) settings.
- debug-mode settings for enabling sync breakpointing and any
surrounding REPL-fixture hooking.
- infected-`asyncio` via guest-mode toggle(s)/cohfig.
'''
_is_root: bool = False # bool
_root_mailbox: tuple[str, str|int] = (None, None) # tuple[str|None, str|None]
_root_addrs: list[
tuple[str, str|int],
] = [] # tuple[str|None, str|None]
# parent->chld ipc protocol caps
_enable_tpts: list[TransportProtocolKey] = field(
default_factory=lambda: [_def_tpt_proto],
)
# registrar info
_registry_addrs: list[tuple] = []
# `debug_mode: bool` settings
_debug_mode: bool = False # bool
repl_fixture: bool|Callable = False # |AbstractContextManager[bool]
# for `tractor.pause_from_sync()` & `breakpoint()` support
use_greenback: bool = False
# infected-`asyncio`-mode: `trio` running as guest.
_is_infected_aio: bool = False
def __setattr__(
self,
key,
val,
) -> None:
breakpoint()
super().__setattr__(key, val)
def update(
self,
from_dict: dict|Struct,
) -> None:
for attr, val in from_dict.items():
setattr(
self,
attr,
val,
)
_runtime_vars: dict[str, Any] = {
# root of actor-process tree info
'_is_root': False, # bool
@ -73,6 +140,23 @@ _runtime_vars: dict[str, Any] = {
}
def get_runtime_vars(
as_dict: bool = True,
) -> dict:
'''
Deliver a **copy** of the current `Actor`'s "runtime variables".
By default, for historical impl reasons, this delivers the `dict`
form, but the `RuntimeVars` struct should be utilized as possible
for future calls.
'''
if as_dict:
return dict(_runtime_vars)
return RuntimeVars(**_runtime_vars)
def last_actor() -> Actor|None:
'''
Try to return last active `Actor` singleton
@ -98,7 +182,7 @@ def current_actor(
_current_actor is None
):
msg: str = 'No local actor has been initialized yet?\n'
from ._exceptions import NoRuntime
from .._exceptions import NoRuntime
if last := last_actor():
msg += (
@ -164,7 +248,7 @@ def current_ipc_ctx(
not ctx
and error_on_not_set
):
from ._exceptions import InternalError
from .._exceptions import InternalError
raise InternalError(
'No IPC context has been allocated for this task yet?\n'
f'|_{current_task()}\n'

View File

@ -30,36 +30,36 @@ import warnings
import trio
from .devx import (
from ..devx import (
debug,
pformat as _pformat,
)
from ._addr import (
from ..discovery._addr import (
UnwrappedAddress,
mk_uuid,
)
from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel
from ..log import get_logger, get_loglevel
from ._runtime import Actor
from ._portal import Portal
from .trionics import (
from ..trionics import (
is_multi_cancelled,
collapse_eg,
)
from ._exceptions import (
from .._exceptions import (
ContextCancelled,
)
from ._root import (
from .._root import (
open_root_actor,
)
from . import _state
from . import _spawn
from ..spawn import _spawn
if TYPE_CHECKING:
import multiprocessing as mp
# from .ipc._server import IPCServer
from .ipc import IPCServer
# from ..ipc._server import IPCServer
from ..ipc import IPCServer
log = get_logger()

View File

@ -0,0 +1,26 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Actor process spawning machinery using
multiple backends (trio, multiprocessing).
NOTE: to avoid circular imports, this ``__init__``
does NOT eagerly import submodules. Use direct
module paths like ``tractor.spawn._spawn`` or
``tractor.spawn._entry`` instead.
'''

View File

@ -29,19 +29,19 @@ from typing import (
import trio # type: ignore
from .log import (
from ..log import (
get_console_log,
get_logger,
)
from . import _state
from .devx import (
from ..runtime import _state
from ..devx import (
_frame_stack,
pformat,
)
# from .msg import pretty_struct
from .to_asyncio import run_as_asyncio_guest
from ._addr import UnwrappedAddress
from ._runtime import (
# from ..msg import pretty_struct
from ..to_asyncio import run_as_asyncio_guest
from ..discovery._addr import UnwrappedAddress
from ..runtime._runtime import (
async_main,
Actor,
)

View File

@ -125,7 +125,7 @@ class PatchedForkServer(ForkServer):
self._forkserver_pid = None
# XXX only thing that changed!
cmd = ('from tractor._forkserver_override import main; ' +
cmd = ('from tractor.spawn._forkserver_override import main; ' +
'main(%d, %d, %r, **%r)')
if self._preload_modules:

View File

@ -34,11 +34,11 @@ from typing import (
import trio
from trio import TaskStatus
from .devx import (
from ..devx import (
debug,
pformat as _pformat
)
from tractor._state import (
from tractor.runtime._state import (
current_actor,
is_main_process,
is_root_process,
@ -46,10 +46,10 @@ from tractor._state import (
_runtime_vars,
)
from tractor.log import get_logger
from tractor._addr import UnwrappedAddress
from tractor._portal import Portal
from tractor._runtime import Actor
from tractor._entry import _mp_main
from tractor.discovery._addr import UnwrappedAddress
from tractor.runtime._portal import Portal
from tractor.runtime._runtime import Actor
from ._entry import _mp_main
from tractor._exceptions import ActorFailure
from tractor.msg import (
types as msgtypes,
@ -58,11 +58,11 @@ from tractor.msg import (
if TYPE_CHECKING:
from ipc import (
from tractor.ipc import (
_server,
Channel,
)
from ._supervise import ActorNursery
from tractor.runtime._supervise import ActorNursery
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)

View File

@ -43,7 +43,7 @@ from tractor._exceptions import (
AsyncioTaskExited,
AsyncioCancelled,
)
from tractor._state import (
from tractor.runtime._state import (
debug_mode,
_runtime_vars,
)

View File

@ -37,7 +37,7 @@ from typing import (
)
import trio
from tractor._state import current_actor
from tractor.runtime._state import current_actor
from tractor.log import get_logger
# from ._beg import collapse_eg
# from ._taskc import (