Compare commits
50 Commits
8bcf1ea8c2
...
243b9cfe3b
| Author | SHA1 | Date |
|---|---|---|
|
|
243b9cfe3b | |
|
|
4145ff9098 | |
|
|
4e931df685 | |
|
|
2e3a554ca9 | |
|
|
5ec2d45413 | |
|
|
47f790035e | |
|
|
81b02a1193 | |
|
|
8ab8586ae2 | |
|
|
6ea17b714e | |
|
|
1a7e5042fa | |
|
|
0a05ef9fa7 | |
|
|
21c18c915f | |
|
|
de84927779 | |
|
|
6cc66fac72 | |
|
|
bbf01d5161 | |
|
|
ec8e8a2786 | |
|
|
c3d1ec22eb | |
|
|
8f44efa327 | |
|
|
5968a3c773 | |
|
|
80597b80bf | |
|
|
a41c6d5c70 | |
|
|
9c37b3f956 | |
|
|
8f6bc56174 | |
|
|
b14dbde77b | |
|
|
cd6509b724 | |
|
|
93d99ed2eb | |
|
|
6215e3b2dd | |
|
|
be5d8da8c0 | |
|
|
21ed181835 | |
|
|
9ec2749ab7 | |
|
|
f3441a6790 | |
|
|
cc42d38284 | |
|
|
6827ceba12 | |
|
|
94458807ce | |
|
|
be5e7e446b | |
|
|
571b2b320e | |
|
|
c7b5d00f19 | |
|
|
1049f7bf38 | |
|
|
cc3bfac741 | |
|
|
e71eec07de | |
|
|
b557ec20a7 | |
|
|
85457cb839 | |
|
|
850219f60c | |
|
|
d929fb75b5 | |
|
|
403c2174a1 | |
|
|
528012f35f | |
|
|
0dfa6f4a8a | |
|
|
a0d3741fac | |
|
|
149b800c9f | |
|
|
03f458a45c |
|
|
@ -20,7 +20,7 @@ async def sleep(
|
|||
|
||||
|
||||
async def open_ctx(
|
||||
n: tractor._supervise.ActorNursery
|
||||
n: tractor.runtime._supervise.ActorNursery
|
||||
):
|
||||
|
||||
# spawn both actors
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ async def main(service_name):
|
|||
await an.start_actor(service_name)
|
||||
|
||||
async with tractor.get_registry() as portal:
|
||||
print(f"Arbiter is listening on {portal.channel}")
|
||||
print(f"Registrar is listening on {portal.channel}")
|
||||
|
||||
async with tractor.wait_for_actor(service_name) as sockaddr:
|
||||
print(f"my_service is found at {sockaddr}")
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ import os
|
|||
import signal
|
||||
import platform
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
import pytest
|
||||
import tractor
|
||||
|
|
@ -52,6 +54,76 @@ no_macos = pytest.mark.skipif(
|
|||
)
|
||||
|
||||
|
||||
def get_cpu_state(
|
||||
icpu: int = 0,
|
||||
setting: Literal[
|
||||
'scaling_governor',
|
||||
'*_pstate_max_freq',
|
||||
'scaling_max_freq',
|
||||
# 'scaling_cur_freq',
|
||||
] = '*_pstate_max_freq',
|
||||
) -> tuple[
|
||||
Path,
|
||||
str|int,
|
||||
]|None:
|
||||
'''
|
||||
Attempt to read the (first) CPU's setting according
|
||||
to the set `setting` from under the file-sys,
|
||||
|
||||
/sys/devices/system/cpu/cpu0/cpufreq/{setting}
|
||||
|
||||
Useful to determine latency headroom for various perf affected
|
||||
test suites.
|
||||
|
||||
'''
|
||||
try:
|
||||
# Read governor for core 0 (usually same for all)
|
||||
setting_path: Path = list(
|
||||
Path(f'/sys/devices/system/cpu/cpu{icpu}/cpufreq/')
|
||||
.glob(f'{setting}')
|
||||
)[0] # <- XXX must be single match!
|
||||
with open(
|
||||
setting_path,
|
||||
'r',
|
||||
) as f:
|
||||
return (
|
||||
setting_path,
|
||||
f.read().strip(),
|
||||
)
|
||||
except (FileNotFoundError, IndexError):
|
||||
return None
|
||||
|
||||
|
||||
def cpu_scaling_factor() -> float:
|
||||
'''
|
||||
Return a latency-headroom multiplier (>= 1.0) reflecting how
|
||||
much to inflate time-limits when CPU-freq scaling is active on
|
||||
linux.
|
||||
|
||||
When no scaling info is available (non-linux, missing sysfs),
|
||||
returns 1.0 (i.e. no headroom adjustment needed).
|
||||
|
||||
'''
|
||||
if _non_linux:
|
||||
return 1.
|
||||
|
||||
mx = get_cpu_state()
|
||||
cur = get_cpu_state(setting='scaling_max_freq')
|
||||
if mx is None or cur is None:
|
||||
return 1.
|
||||
|
||||
_mx_pth, max_freq = mx
|
||||
_cur_pth, cur_freq = cur
|
||||
cpu_scaled: float = int(cur_freq) / int(max_freq)
|
||||
|
||||
if cpu_scaled != 1.:
|
||||
return 1. / (
|
||||
cpu_scaled * 2 # <- bc likely "dual threaded"
|
||||
)
|
||||
|
||||
return 1.
|
||||
|
||||
|
||||
def pytest_addoption(
|
||||
parser: pytest.Parser,
|
||||
):
|
||||
|
|
|
|||
|
|
@ -126,7 +126,7 @@ def test_shield_pause(
|
|||
child.pid,
|
||||
signal.SIGINT,
|
||||
)
|
||||
from tractor._supervise import _shutdown_msg
|
||||
from tractor.runtime._supervise import _shutdown_msg
|
||||
expect(
|
||||
child,
|
||||
# 'Shutting down actor runtime',
|
||||
|
|
|
|||
|
|
@ -8,17 +8,16 @@ from pathlib import Path
|
|||
import pytest
|
||||
import trio
|
||||
import tractor
|
||||
from tractor import (
|
||||
Actor,
|
||||
_state,
|
||||
_addr,
|
||||
)
|
||||
from tractor import Actor
|
||||
from tractor.runtime import _state
|
||||
from tractor.discovery import _addr
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bindspace_dir_str() -> str:
|
||||
|
||||
rt_dir: Path = tractor._state.get_rt_dir()
|
||||
from tractor.runtime._state import get_rt_dir
|
||||
rt_dir: Path = get_rt_dir()
|
||||
bs_dir: Path = rt_dir / 'doggy'
|
||||
bs_dir_str: str = str(bs_dir)
|
||||
assert not bs_dir.is_dir()
|
||||
|
|
|
|||
|
|
@ -13,9 +13,9 @@ from tractor import (
|
|||
Portal,
|
||||
ipc,
|
||||
msg,
|
||||
_state,
|
||||
_addr,
|
||||
)
|
||||
from tractor.runtime import _state
|
||||
from tractor.discovery import _addr
|
||||
|
||||
@tractor.context
|
||||
async def chk_tpts(
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ async def maybe_expect_raises(
|
|||
Async wrapper for ensuring errors propagate from the inner scope.
|
||||
|
||||
'''
|
||||
if tractor._state.debug_mode():
|
||||
if tractor.debug_mode():
|
||||
timeout += 999
|
||||
|
||||
with trio.fail_after(timeout):
|
||||
|
|
|
|||
|
|
@ -490,7 +490,7 @@ def test_cancel_via_SIGINT(
|
|||
"""Ensure that a control-C (SIGINT) signal cancels both the parent and
|
||||
child processes in trionic fashion
|
||||
"""
|
||||
pid = os.getpid()
|
||||
pid: int = os.getpid()
|
||||
|
||||
async def main():
|
||||
with trio.fail_after(2):
|
||||
|
|
@ -517,6 +517,8 @@ def test_cancel_via_SIGINT_other_task(
|
|||
started from a seperate ``trio`` child task.
|
||||
|
||||
'''
|
||||
from .conftest import cpu_scaling_factor
|
||||
|
||||
pid: int = os.getpid()
|
||||
timeout: float = (
|
||||
4 if _non_linux
|
||||
|
|
@ -525,6 +527,11 @@ def test_cancel_via_SIGINT_other_task(
|
|||
if _friggin_windows: # smh
|
||||
timeout += 1
|
||||
|
||||
# add latency headroom for CPU freq scaling (auto-cpufreq et al.)
|
||||
headroom: float = cpu_scaling_factor()
|
||||
if headroom != 1.:
|
||||
timeout *= headroom
|
||||
|
||||
async def spawn_and_sleep_forever(
|
||||
task_status=trio.TASK_STATUS_IGNORED
|
||||
):
|
||||
|
|
|
|||
|
|
@ -10,7 +10,20 @@ from tractor._testing import tractor_test
|
|||
MESSAGE = 'tractoring at full speed'
|
||||
|
||||
|
||||
def test_empty_mngrs_input_raises() -> None:
|
||||
def test_empty_mngrs_input_raises(
|
||||
tpt_proto: str,
|
||||
) -> None:
|
||||
# TODO, the `open_actor_cluster()` teardown hangs
|
||||
# intermittently on UDS when `gather_contexts(mngrs=())`
|
||||
# raises `ValueError` mid-setup; likely a race in the
|
||||
# actor-nursery cleanup vs UDS socket shutdown. Needs
|
||||
# a deeper look at `._clustering`/`._supervise` teardown
|
||||
# paths with the UDS transport.
|
||||
if tpt_proto == 'uds':
|
||||
pytest.skip(
|
||||
'actor-cluster teardown hangs intermittently on UDS'
|
||||
)
|
||||
|
||||
async def main():
|
||||
with trio.fail_after(3):
|
||||
async with (
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ from tractor._exceptions import (
|
|||
StreamOverrun,
|
||||
ContextCancelled,
|
||||
)
|
||||
from tractor._state import current_ipc_ctx
|
||||
from tractor.runtime._state import current_ipc_ctx
|
||||
|
||||
from tractor._testing import (
|
||||
tractor_test,
|
||||
|
|
@ -939,7 +939,7 @@ def test_one_end_stream_not_opened(
|
|||
|
||||
'''
|
||||
overrunner, buf_size_increase, entrypoint = overrun_by
|
||||
from tractor._runtime import Actor
|
||||
from tractor.runtime._runtime import Actor
|
||||
buf_size = buf_size_increase + Actor.msg_buffer_size
|
||||
|
||||
timeout: float = (
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
"""
|
||||
Discovery subsys.
|
||||
'''
|
||||
Discovery subsystem via a "registrar" actor scenarios.
|
||||
|
||||
"""
|
||||
'''
|
||||
import os
|
||||
import signal
|
||||
import platform
|
||||
|
|
@ -24,7 +24,7 @@ async def test_reg_then_unreg(
|
|||
reg_addr: tuple,
|
||||
):
|
||||
actor = tractor.current_actor()
|
||||
assert actor.is_arbiter
|
||||
assert actor.is_registrar
|
||||
assert len(actor._registry) == 1 # only self is registered
|
||||
|
||||
async with tractor.open_nursery(
|
||||
|
|
@ -35,7 +35,7 @@ async def test_reg_then_unreg(
|
|||
uid = portal.channel.aid.uid
|
||||
|
||||
async with tractor.get_registry(reg_addr) as aportal:
|
||||
# this local actor should be the arbiter
|
||||
# this local actor should be the registrar
|
||||
assert actor is aportal.actor
|
||||
|
||||
async with tractor.wait_for_actor('actor'):
|
||||
|
|
@ -154,7 +154,7 @@ async def unpack_reg(
|
|||
actor_or_portal: tractor.Portal|tractor.Actor,
|
||||
):
|
||||
'''
|
||||
Get and unpack a "registry" RPC request from the "arbiter" registry
|
||||
Get and unpack a "registry" RPC request from the registrar
|
||||
system.
|
||||
|
||||
'''
|
||||
|
|
@ -163,7 +163,10 @@ async def unpack_reg(
|
|||
else:
|
||||
msg = await actor_or_portal.run_from_ns('self', 'get_registry')
|
||||
|
||||
return {tuple(key.split('.')): val for key, val in msg.items()}
|
||||
return {
|
||||
tuple(key.split('.')): val
|
||||
for key, val in msg.items()
|
||||
}
|
||||
|
||||
|
||||
async def spawn_and_check_registry(
|
||||
|
|
@ -194,15 +197,15 @@ async def spawn_and_check_registry(
|
|||
actor = tractor.current_actor()
|
||||
|
||||
if remote_arbiter:
|
||||
assert not actor.is_arbiter
|
||||
assert not actor.is_registrar
|
||||
|
||||
if actor.is_arbiter:
|
||||
extra = 1 # arbiter is local root actor
|
||||
if actor.is_registrar:
|
||||
extra = 1 # registrar is local root actor
|
||||
get_reg = partial(unpack_reg, actor)
|
||||
|
||||
else:
|
||||
get_reg = partial(unpack_reg, portal)
|
||||
extra = 2 # local root actor + remote arbiter
|
||||
extra = 2 # local root actor + remote registrar
|
||||
|
||||
# ensure current actor is registered
|
||||
registry: dict = await get_reg()
|
||||
|
|
@ -282,7 +285,7 @@ def test_subactors_unregister_on_cancel(
|
|||
):
|
||||
'''
|
||||
Verify that cancelling a nursery results in all subactors
|
||||
deregistering themselves with the arbiter.
|
||||
deregistering themselves with the registrar.
|
||||
|
||||
'''
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
|
|
@ -311,7 +314,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
|
|||
'''
|
||||
Verify that cancelling a nursery results in all subactors
|
||||
deregistering themselves with a **remote** (not in the local
|
||||
process tree) arbiter.
|
||||
process tree) registrar.
|
||||
|
||||
'''
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
|
|
@ -356,20 +359,24 @@ async def close_chans_before_nursery(
|
|||
try:
|
||||
get_reg = partial(unpack_reg, aportal)
|
||||
|
||||
async with tractor.open_nursery() as tn:
|
||||
portal1 = await tn.start_actor(
|
||||
name='consumer1', enable_modules=[__name__])
|
||||
portal2 = await tn.start_actor(
|
||||
'consumer2', enable_modules=[__name__])
|
||||
async with tractor.open_nursery() as an:
|
||||
portal1 = await an.start_actor(
|
||||
name='consumer1',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
portal2 = await an.start_actor(
|
||||
'consumer2',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
# TODO: compact this back as was in last commit once
|
||||
# 3.9+, see https://github.com/goodboy/tractor/issues/207
|
||||
async with portal1.open_stream_from(
|
||||
stream_forever
|
||||
) as agen1:
|
||||
async with portal2.open_stream_from(
|
||||
async with (
|
||||
portal1.open_stream_from(
|
||||
stream_forever
|
||||
) as agen2:
|
||||
) as agen1,
|
||||
portal2.open_stream_from(
|
||||
stream_forever
|
||||
) as agen2,
|
||||
):
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
|
|
@ -380,7 +387,7 @@ async def close_chans_before_nursery(
|
|||
await streamer(agen2)
|
||||
finally:
|
||||
# Kill the root nursery thus resulting in
|
||||
# normal arbiter channel ops to fail during
|
||||
# normal registrar channel ops to fail during
|
||||
# teardown. It doesn't seem like this is
|
||||
# reliably triggered by an external SIGINT.
|
||||
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
|
||||
|
|
@ -392,6 +399,7 @@ async def close_chans_before_nursery(
|
|||
# also kill off channels cuz why not
|
||||
await agen1.aclose()
|
||||
await agen2.aclose()
|
||||
|
||||
finally:
|
||||
with trio.CancelScope(shield=True):
|
||||
await trio.sleep(1)
|
||||
|
|
@ -412,7 +420,7 @@ def test_close_channel_explicit(
|
|||
'''
|
||||
Verify that closing a stream explicitly and killing the actor's
|
||||
"root nursery" **before** the containing nursery tears down also
|
||||
results in subactor(s) deregistering from the arbiter.
|
||||
results in subactor(s) deregistering from the registrar.
|
||||
|
||||
'''
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
|
|
@ -427,7 +435,7 @@ def test_close_channel_explicit(
|
|||
|
||||
|
||||
@pytest.mark.parametrize('use_signal', [False, True])
|
||||
def test_close_channel_explicit_remote_arbiter(
|
||||
def test_close_channel_explicit_remote_registrar(
|
||||
daemon: subprocess.Popen,
|
||||
start_method: str,
|
||||
use_signal: bool,
|
||||
|
|
@ -436,7 +444,7 @@ def test_close_channel_explicit_remote_arbiter(
|
|||
'''
|
||||
Verify that closing a stream explicitly and killing the actor's
|
||||
"root nursery" **before** the containing nursery tears down also
|
||||
results in subactor(s) deregistering from the arbiter.
|
||||
results in subactor(s) deregistering from the registrar.
|
||||
|
||||
'''
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
|
|
@ -448,3 +456,65 @@ def test_close_channel_explicit_remote_arbiter(
|
|||
remote_arbiter=True,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def kill_transport(
|
||||
ctx: tractor.Context,
|
||||
) -> None:
|
||||
|
||||
await ctx.started()
|
||||
actor: tractor.Actor = tractor.current_actor()
|
||||
actor.ipc_server.cancel()
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
|
||||
# @pytest.mark.parametrize('use_signal', [False, True])
|
||||
def test_stale_entry_is_deleted(
|
||||
debug_mode: bool,
|
||||
daemon: subprocess.Popen,
|
||||
start_method: str,
|
||||
reg_addr: tuple,
|
||||
):
|
||||
'''
|
||||
Ensure that when a stale entry is detected in the registrar's
|
||||
table that the `find_actor()` API takes care of deleting the
|
||||
stale entry and not delivering a bad portal.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
|
||||
name: str = 'transport_fails_actor'
|
||||
_reg_ptl: tractor.Portal
|
||||
an: tractor.ActorNursery
|
||||
async with (
|
||||
tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
registry_addrs=[reg_addr],
|
||||
) as an,
|
||||
tractor.get_registry(reg_addr) as _reg_ptl,
|
||||
):
|
||||
ptl: tractor.Portal = await an.start_actor(
|
||||
name,
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
async with ptl.open_context(
|
||||
kill_transport,
|
||||
) as (first, ctx):
|
||||
async with tractor.find_actor(
|
||||
name,
|
||||
registry_addrs=[reg_addr],
|
||||
) as maybe_portal:
|
||||
# because the transitive
|
||||
# `._discovery.maybe_open_portal()` call should
|
||||
# fail and implicitly call `.delete_addr()`
|
||||
assert maybe_portal is None
|
||||
registry: dict = await unpack_reg(_reg_ptl)
|
||||
assert ptl.chan.aid.uid not in registry
|
||||
|
||||
# should fail since we knocked out the IPC tpt XD
|
||||
await ptl.cancel_actor()
|
||||
await an.cancel()
|
||||
|
||||
trio.run(main)
|
||||
|
|
|
|||
|
|
@ -94,8 +94,10 @@ def run_example_in_subproc(
|
|||
for f in p[2]
|
||||
|
||||
if (
|
||||
'__' not in f
|
||||
and f[0] != '_'
|
||||
'__' not in f # ignore any pkg-mods
|
||||
# ignore any `__pycache__` subdir
|
||||
and '__pycache__' not in str(p[0])
|
||||
and f[0] != '_' # ignore any WIP "examplel mods"
|
||||
and 'debugging' not in p[0]
|
||||
and 'integration' not in p[0]
|
||||
and 'advanced_faults' not in p[0]
|
||||
|
|
@ -143,12 +145,19 @@ def test_example(
|
|||
'This test does run just fine "in person" however..'
|
||||
)
|
||||
|
||||
from .conftest import cpu_scaling_factor
|
||||
|
||||
timeout: float = (
|
||||
60
|
||||
if ci_env and _non_linux
|
||||
else 16
|
||||
)
|
||||
|
||||
# add latency headroom for CPU freq scaling (auto-cpufreq et al.)
|
||||
headroom: float = cpu_scaling_factor()
|
||||
if headroom != 1.:
|
||||
timeout *= headroom
|
||||
|
||||
with open(ex_file, 'r') as ex:
|
||||
code = ex.read()
|
||||
|
||||
|
|
|
|||
|
|
@ -26,8 +26,8 @@ from tractor import (
|
|||
to_asyncio,
|
||||
RemoteActorError,
|
||||
ContextCancelled,
|
||||
_state,
|
||||
)
|
||||
from tractor.runtime import _state
|
||||
from tractor.trionics import BroadcastReceiver
|
||||
from tractor._testing import expect_ctxc
|
||||
|
||||
|
|
|
|||
|
|
@ -201,7 +201,7 @@ async def stream_from_peer(
|
|||
) -> None:
|
||||
|
||||
# sanity
|
||||
assert tractor._state.debug_mode() == debug_mode
|
||||
assert tractor.debug_mode() == debug_mode
|
||||
|
||||
peer: Portal
|
||||
try:
|
||||
|
|
@ -841,7 +841,7 @@ async def serve_subactors(
|
|||
async with open_nursery() as an:
|
||||
|
||||
# sanity
|
||||
assert tractor._state.debug_mode() == debug_mode
|
||||
assert tractor.debug_mode() == debug_mode
|
||||
|
||||
await ctx.started(peer_name)
|
||||
async with ctx.open_stream() as ipc:
|
||||
|
|
@ -880,7 +880,7 @@ async def client_req_subactor(
|
|||
) -> None:
|
||||
# sanity
|
||||
if debug_mode:
|
||||
assert tractor._state.debug_mode()
|
||||
assert tractor.debug_mode()
|
||||
|
||||
# TODO: other cases to do with sub lifetimes:
|
||||
# -[ ] test that we can have the server spawn a sub
|
||||
|
|
|
|||
|
|
@ -300,19 +300,43 @@ def test_a_quadruple_example(
|
|||
time_quad_ex: tuple[list[int], float],
|
||||
ci_env: bool,
|
||||
spawn_backend: str,
|
||||
test_log: tractor.log.StackLevelAdapter,
|
||||
):
|
||||
'''
|
||||
This also serves as a kind of "we'd like to be this fast test".
|
||||
This also serves as a "we'd like to be this fast" smoke test
|
||||
given past empirical eval of this suite.
|
||||
|
||||
'''
|
||||
non_linux: bool = (_sys := platform.system()) != 'Linux'
|
||||
|
||||
results, diff = time_quad_ex
|
||||
assert results
|
||||
this_fast_on_linux: float = 3
|
||||
this_fast = (
|
||||
6 if non_linux
|
||||
else 3
|
||||
else this_fast_on_linux
|
||||
)
|
||||
# ^ XXX NOTE,
|
||||
# i've noticed that tweaking the CPU governor setting
|
||||
# to not "always" enable "turbo" mode can result in latency
|
||||
# which causes this limit to be too little. Not sure if it'd
|
||||
# be worth it to adjust the linux value based on reading the
|
||||
# CPU conf from the sys?
|
||||
#
|
||||
# For ex, see the `auto-cpufreq` docs on such settings,
|
||||
# https://github.com/AdnanHodzic/auto-cpufreq?tab=readme-ov-file#example-config-file-contents
|
||||
#
|
||||
# HENCE this below latency-headroom compensation logic..
|
||||
from .conftest import cpu_scaling_factor
|
||||
headroom: float = cpu_scaling_factor()
|
||||
if headroom != 1.:
|
||||
this_fast = this_fast_on_linux * headroom
|
||||
test_log.warning(
|
||||
f'Adding latency headroom on linux bc CPU scaling,\n'
|
||||
f'headroom: {headroom}\n'
|
||||
f'this_fast_on_linux: {this_fast_on_linux} -> {this_fast}\n'
|
||||
)
|
||||
|
||||
results, diff = time_quad_ex
|
||||
assert results
|
||||
assert diff < this_fast
|
||||
|
||||
|
||||
|
|
@ -353,7 +377,7 @@ def test_not_fast_enough_quad(
|
|||
assert results is None
|
||||
|
||||
|
||||
@tractor_test
|
||||
@tractor_test(timeout=20)
|
||||
async def test_respawn_consumer_task(
|
||||
reg_addr: tuple,
|
||||
spawn_backend: str,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
"""
|
||||
Arbiter and "local" actor api
|
||||
Registrar and "local" actor api
|
||||
"""
|
||||
import time
|
||||
|
||||
|
|
@ -12,11 +12,11 @@ from tractor._testing import tractor_test
|
|||
|
||||
@pytest.mark.trio
|
||||
async def test_no_runtime():
|
||||
"""An arbitter must be established before any nurseries
|
||||
"""A registrar must be established before any nurseries
|
||||
can be created.
|
||||
|
||||
(In other words ``tractor.open_root_actor()`` must be engaged at
|
||||
some point?)
|
||||
(In other words ``tractor.open_root_actor()`` must be
|
||||
engaged at some point?)
|
||||
"""
|
||||
with pytest.raises(RuntimeError) :
|
||||
async with tractor.find_actor('doggy'):
|
||||
|
|
@ -25,9 +25,9 @@ async def test_no_runtime():
|
|||
|
||||
@tractor_test
|
||||
async def test_self_is_registered(reg_addr):
|
||||
"Verify waiting on the arbiter to register itself using the standard api."
|
||||
"Verify waiting on the registrar to register itself using the standard api."
|
||||
actor = tractor.current_actor()
|
||||
assert actor.is_arbiter
|
||||
assert actor.is_registrar
|
||||
with trio.fail_after(0.2):
|
||||
async with tractor.wait_for_actor('root') as portal:
|
||||
assert portal.channel.uid[0] == 'root'
|
||||
|
|
@ -35,11 +35,11 @@ async def test_self_is_registered(reg_addr):
|
|||
|
||||
@tractor_test
|
||||
async def test_self_is_registered_localportal(reg_addr):
|
||||
"Verify waiting on the arbiter to register itself using a local portal."
|
||||
"Verify waiting on the registrar to register itself using a local portal."
|
||||
actor = tractor.current_actor()
|
||||
assert actor.is_arbiter
|
||||
assert actor.is_registrar
|
||||
async with tractor.get_registry(reg_addr) as portal:
|
||||
assert isinstance(portal, tractor._portal.LocalPortal)
|
||||
assert isinstance(portal, tractor.runtime._portal.LocalPortal)
|
||||
|
||||
with trio.fail_after(0.2):
|
||||
sockaddr = await portal.run_from_ns(
|
||||
|
|
@ -57,8 +57,8 @@ def test_local_actor_async_func(reg_addr):
|
|||
async with tractor.open_root_actor(
|
||||
registry_addrs=[reg_addr],
|
||||
):
|
||||
# arbiter is started in-proc if dne
|
||||
assert tractor.current_actor().is_arbiter
|
||||
# registrar is started in-proc if dne
|
||||
assert tractor.current_actor().is_registrar
|
||||
|
||||
for i in range(10):
|
||||
nums.append(i)
|
||||
|
|
|
|||
|
|
@ -17,11 +17,11 @@ from tractor._testing import (
|
|||
)
|
||||
from tractor import (
|
||||
current_actor,
|
||||
_state,
|
||||
Actor,
|
||||
Context,
|
||||
Portal,
|
||||
)
|
||||
from tractor.runtime import _state
|
||||
from .conftest import (
|
||||
sig_prog,
|
||||
_INT_SIGNAL,
|
||||
|
|
@ -30,7 +30,7 @@ from .conftest import (
|
|||
|
||||
if TYPE_CHECKING:
|
||||
from tractor.msg import Aid
|
||||
from tractor._addr import (
|
||||
from tractor.discovery._addr import (
|
||||
UnwrappedAddress,
|
||||
)
|
||||
|
||||
|
|
@ -53,19 +53,19 @@ def test_abort_on_sigint(
|
|||
|
||||
|
||||
@tractor_test
|
||||
async def test_cancel_remote_arbiter(
|
||||
async def test_cancel_remote_registrar(
|
||||
daemon: subprocess.Popen,
|
||||
reg_addr: UnwrappedAddress,
|
||||
):
|
||||
assert not current_actor().is_arbiter
|
||||
assert not current_actor().is_registrar
|
||||
async with tractor.get_registry(reg_addr) as portal:
|
||||
await portal.cancel_actor()
|
||||
|
||||
time.sleep(0.1)
|
||||
# the arbiter channel server is cancelled but not its main task
|
||||
# the registrar channel server is cancelled but not its main task
|
||||
assert daemon.returncode is None
|
||||
|
||||
# no arbiter socket should exist
|
||||
# no registrar socket should exist
|
||||
with pytest.raises(OSError):
|
||||
async with tractor.get_registry(reg_addr) as portal:
|
||||
pass
|
||||
|
|
@ -80,7 +80,7 @@ def test_register_duplicate_name(
|
|||
registry_addrs=[reg_addr],
|
||||
) as an:
|
||||
|
||||
assert not current_actor().is_arbiter
|
||||
assert not current_actor().is_registrar
|
||||
|
||||
p1 = await an.start_actor('doggy')
|
||||
p2 = await an.start_actor('doggy')
|
||||
|
|
@ -122,7 +122,7 @@ async def get_root_portal(
|
|||
|
||||
# connect back to our immediate parent which should also
|
||||
# be the actor-tree's root.
|
||||
from tractor._discovery import get_root
|
||||
from tractor.discovery._discovery import get_root
|
||||
ptl: Portal
|
||||
async with get_root() as ptl:
|
||||
root_aid: Aid = ptl.chan.aid
|
||||
|
|
|
|||
|
|
@ -0,0 +1,333 @@
|
|||
'''
|
||||
Verify that externally registered remote actor error
|
||||
types are correctly relayed, boxed, and re-raised across
|
||||
IPC actor hops via `reg_err_types()`.
|
||||
|
||||
Also ensure that when custom error types are NOT registered
|
||||
the framework indicates the lookup failure to the user.
|
||||
|
||||
'''
|
||||
import pytest
|
||||
import trio
|
||||
import tractor
|
||||
from tractor import (
|
||||
Context,
|
||||
Portal,
|
||||
RemoteActorError,
|
||||
)
|
||||
from tractor._exceptions import (
|
||||
get_err_type,
|
||||
reg_err_types,
|
||||
)
|
||||
|
||||
|
||||
# -- custom app-level errors for testing --
|
||||
class CustomAppError(Exception):
|
||||
'''
|
||||
A hypothetical user-app error that should be
|
||||
boxed+relayed by `tractor` IPC when registered.
|
||||
|
||||
'''
|
||||
|
||||
|
||||
class AnotherAppError(Exception):
|
||||
'''
|
||||
A second custom error for multi-type registration.
|
||||
|
||||
'''
|
||||
|
||||
|
||||
class UnregisteredAppError(Exception):
|
||||
'''
|
||||
A custom error that is intentionally NEVER
|
||||
registered via `reg_err_types()` so we can
|
||||
verify the framework's failure indication.
|
||||
|
||||
'''
|
||||
|
||||
|
||||
# -- remote-task endpoints --
|
||||
@tractor.context
|
||||
async def raise_custom_err(
|
||||
ctx: Context,
|
||||
) -> None:
|
||||
'''
|
||||
Remote ep that raises a `CustomAppError`
|
||||
after sync-ing with the caller.
|
||||
|
||||
'''
|
||||
await ctx.started()
|
||||
raise CustomAppError(
|
||||
'the app exploded remotely'
|
||||
)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def raise_another_err(
|
||||
ctx: Context,
|
||||
) -> None:
|
||||
'''
|
||||
Remote ep that raises `AnotherAppError`.
|
||||
|
||||
'''
|
||||
await ctx.started()
|
||||
raise AnotherAppError(
|
||||
'another app-level kaboom'
|
||||
)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def raise_unreg_err(
|
||||
ctx: Context,
|
||||
) -> None:
|
||||
'''
|
||||
Remote ep that raises an `UnregisteredAppError`
|
||||
which has NOT been `reg_err_types()`-registered.
|
||||
|
||||
'''
|
||||
await ctx.started()
|
||||
raise UnregisteredAppError(
|
||||
'this error type is unknown to tractor'
|
||||
)
|
||||
|
||||
|
||||
# -- unit tests for the type-registry plumbing --
|
||||
|
||||
class TestRegErrTypesPlumbing:
|
||||
'''
|
||||
Low-level checks on `reg_err_types()` and
|
||||
`get_err_type()` without requiring IPC.
|
||||
|
||||
'''
|
||||
|
||||
def test_unregistered_type_returns_none(self):
|
||||
'''
|
||||
An unregistered custom error name should yield
|
||||
`None` from `get_err_type()`.
|
||||
|
||||
'''
|
||||
result = get_err_type('CustomAppError')
|
||||
assert result is None
|
||||
|
||||
def test_register_and_lookup(self):
|
||||
'''
|
||||
After `reg_err_types()`, the custom type should
|
||||
be discoverable via `get_err_type()`.
|
||||
|
||||
'''
|
||||
reg_err_types([CustomAppError])
|
||||
result = get_err_type('CustomAppError')
|
||||
assert result is CustomAppError
|
||||
|
||||
def test_register_multiple_types(self):
|
||||
'''
|
||||
Registering a list of types should make each
|
||||
one individually resolvable.
|
||||
|
||||
'''
|
||||
reg_err_types([
|
||||
CustomAppError,
|
||||
AnotherAppError,
|
||||
])
|
||||
assert (
|
||||
get_err_type('CustomAppError')
|
||||
is CustomAppError
|
||||
)
|
||||
assert (
|
||||
get_err_type('AnotherAppError')
|
||||
is AnotherAppError
|
||||
)
|
||||
|
||||
def test_builtin_types_always_resolve(self):
|
||||
'''
|
||||
Builtin error types like `RuntimeError` and
|
||||
`ValueError` should always be found without
|
||||
any prior registration.
|
||||
|
||||
'''
|
||||
assert (
|
||||
get_err_type('RuntimeError')
|
||||
is RuntimeError
|
||||
)
|
||||
assert (
|
||||
get_err_type('ValueError')
|
||||
is ValueError
|
||||
)
|
||||
|
||||
def test_tractor_native_types_resolve(self):
|
||||
'''
|
||||
`tractor`-internal exc types (e.g.
|
||||
`ContextCancelled`) should always resolve.
|
||||
|
||||
'''
|
||||
assert (
|
||||
get_err_type('ContextCancelled')
|
||||
is tractor.ContextCancelled
|
||||
)
|
||||
|
||||
def test_boxed_type_str_without_ipc_msg(self):
|
||||
'''
|
||||
When a `RemoteActorError` is constructed
|
||||
without an IPC msg (and no resolvable type),
|
||||
`.boxed_type_str` should return `'<unknown>'`.
|
||||
|
||||
'''
|
||||
rae = RemoteActorError('test')
|
||||
assert rae.boxed_type_str == '<unknown>'
|
||||
|
||||
|
||||
# -- IPC-level integration tests --
|
||||
|
||||
def test_registered_custom_err_relayed(
|
||||
debug_mode: bool,
|
||||
tpt_proto: str,
|
||||
):
|
||||
'''
|
||||
When a custom error type is registered via
|
||||
`reg_err_types()` on BOTH sides of an IPC dialog,
|
||||
the parent should receive a `RemoteActorError`
|
||||
whose `.boxed_type` matches the original custom
|
||||
error type.
|
||||
|
||||
'''
|
||||
reg_err_types([CustomAppError])
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
enable_transports=[tpt_proto],
|
||||
) as an:
|
||||
ptl: Portal = await an.start_actor(
|
||||
'custom-err-raiser',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
async with ptl.open_context(
|
||||
raise_custom_err,
|
||||
) as (ctx, sent):
|
||||
assert not sent
|
||||
try:
|
||||
await ctx.wait_for_result()
|
||||
except RemoteActorError as rae:
|
||||
assert rae.boxed_type is CustomAppError
|
||||
assert rae.src_type is CustomAppError
|
||||
assert 'the app exploded remotely' in str(
|
||||
rae.tb_str
|
||||
)
|
||||
raise
|
||||
|
||||
with pytest.raises(RemoteActorError) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
rae = excinfo.value
|
||||
assert rae.boxed_type is CustomAppError
|
||||
|
||||
|
||||
def test_registered_another_err_relayed(
|
||||
debug_mode: bool,
|
||||
tpt_proto: str,
|
||||
):
|
||||
'''
|
||||
Same as above but for a different custom error
|
||||
type to verify multi-type registration works
|
||||
end-to-end over IPC.
|
||||
|
||||
'''
|
||||
reg_err_types([AnotherAppError])
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
enable_transports=[tpt_proto],
|
||||
) as an:
|
||||
ptl: Portal = await an.start_actor(
|
||||
'another-err-raiser',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
async with ptl.open_context(
|
||||
raise_another_err,
|
||||
) as (ctx, sent):
|
||||
assert not sent
|
||||
try:
|
||||
await ctx.wait_for_result()
|
||||
except RemoteActorError as rae:
|
||||
assert (
|
||||
rae.boxed_type
|
||||
is AnotherAppError
|
||||
)
|
||||
raise
|
||||
|
||||
await an.cancel()
|
||||
|
||||
with pytest.raises(RemoteActorError) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
rae = excinfo.value
|
||||
assert rae.boxed_type is AnotherAppError
|
||||
|
||||
|
||||
def test_unregistered_err_still_relayed(
|
||||
debug_mode: bool,
|
||||
tpt_proto: str,
|
||||
):
|
||||
'''
|
||||
Verify that even when a custom error type is NOT registered via
|
||||
`reg_err_types()`, the remote error is still relayed as
|
||||
a `RemoteActorError` with all string-level info preserved
|
||||
(traceback, type name, source actor uid).
|
||||
|
||||
The `.boxed_type` will be `None` (type obj can't be resolved) but
|
||||
`.boxed_type_str` and `.src_type_str` still report the original
|
||||
type name from the IPC msg.
|
||||
|
||||
This documents the expected limitation: without `reg_err_types()`
|
||||
the `.boxed_type` property can NOT resolve to the original Python
|
||||
type.
|
||||
|
||||
'''
|
||||
# NOTE: intentionally do NOT call
|
||||
# `reg_err_types([UnregisteredAppError])`
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
enable_transports=[tpt_proto],
|
||||
) as an:
|
||||
ptl: Portal = await an.start_actor(
|
||||
'unreg-err-raiser',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
async with ptl.open_context(
|
||||
raise_unreg_err,
|
||||
) as (ctx, sent):
|
||||
assert not sent
|
||||
await ctx.wait_for_result()
|
||||
|
||||
await an.cancel()
|
||||
|
||||
with pytest.raises(RemoteActorError) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
rae = excinfo.value
|
||||
|
||||
# the error IS relayed even without
|
||||
# registration; type obj is unresolvable but
|
||||
# all string-level info is preserved.
|
||||
assert rae.boxed_type is None # NOT `UnregisteredAppError`
|
||||
assert rae.src_type is None
|
||||
|
||||
# string names survive the IPC round-trip
|
||||
# via the `Error` msg fields.
|
||||
assert (
|
||||
rae.src_type_str
|
||||
==
|
||||
'UnregisteredAppError'
|
||||
)
|
||||
assert (
|
||||
rae.boxed_type_str
|
||||
==
|
||||
'UnregisteredAppError'
|
||||
)
|
||||
|
||||
# original traceback content is preserved
|
||||
assert 'this error type is unknown' in rae.tb_str
|
||||
assert 'UnregisteredAppError' in rae.tb_str
|
||||
|
|
@ -94,15 +94,15 @@ def test_runtime_vars_unset(
|
|||
after the root actor-runtime exits!
|
||||
|
||||
'''
|
||||
assert not tractor._state._runtime_vars['_debug_mode']
|
||||
assert not tractor.runtime._state._runtime_vars['_debug_mode']
|
||||
async def main():
|
||||
assert not tractor._state._runtime_vars['_debug_mode']
|
||||
assert not tractor.runtime._state._runtime_vars['_debug_mode']
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=True,
|
||||
):
|
||||
assert tractor._state._runtime_vars['_debug_mode']
|
||||
assert tractor.runtime._state._runtime_vars['_debug_mode']
|
||||
|
||||
# after runtime closure, should be reverted!
|
||||
assert not tractor._state._runtime_vars['_debug_mode']
|
||||
assert not tractor.runtime._state._runtime_vars['_debug_mode']
|
||||
|
||||
trio.run(main)
|
||||
|
|
|
|||
|
|
@ -110,7 +110,7 @@ def test_rpc_errors(
|
|||
) as n:
|
||||
|
||||
actor = tractor.current_actor()
|
||||
assert actor.is_arbiter
|
||||
assert actor.is_registrar
|
||||
await n.run_in_actor(
|
||||
sleep_back_actor,
|
||||
actor_name=subactor_requests_to,
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ async def spawn(
|
|||
):
|
||||
# now runtime exists
|
||||
actor: tractor.Actor = tractor.current_actor()
|
||||
assert actor.is_arbiter == should_be_root
|
||||
assert actor.is_registrar == should_be_root
|
||||
|
||||
# spawns subproc here
|
||||
portal: tractor.Portal = await an.run_in_actor(
|
||||
|
|
@ -68,7 +68,7 @@ async def spawn(
|
|||
assert result == 10
|
||||
return result
|
||||
else:
|
||||
assert actor.is_arbiter == should_be_root
|
||||
assert actor.is_registrar == should_be_root
|
||||
return 10
|
||||
|
||||
|
||||
|
|
@ -181,7 +181,7 @@ def test_loglevel_propagated_to_subactor(
|
|||
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
name='arbiter',
|
||||
name='registrar',
|
||||
start_method=start_method,
|
||||
arbiter_addr=reg_addr,
|
||||
|
||||
|
|
|
|||
|
|
@ -30,21 +30,23 @@ from ._streaming import (
|
|||
MsgStream as MsgStream,
|
||||
stream as stream,
|
||||
)
|
||||
from ._discovery import (
|
||||
from .discovery._discovery import (
|
||||
get_registry as get_registry,
|
||||
find_actor as find_actor,
|
||||
wait_for_actor as wait_for_actor,
|
||||
query_actor as query_actor,
|
||||
)
|
||||
from ._supervise import (
|
||||
from .runtime._supervise import (
|
||||
open_nursery as open_nursery,
|
||||
ActorNursery as ActorNursery,
|
||||
)
|
||||
from ._state import (
|
||||
from .runtime._state import (
|
||||
RuntimeVars as RuntimeVars,
|
||||
current_actor as current_actor,
|
||||
is_root_process as is_root_process,
|
||||
current_ipc_ctx as current_ipc_ctx,
|
||||
debug_mode as debug_mode
|
||||
debug_mode as debug_mode,
|
||||
get_runtime_vars as get_runtime_vars,
|
||||
is_root_process as is_root_process,
|
||||
)
|
||||
from ._exceptions import (
|
||||
ContextCancelled as ContextCancelled,
|
||||
|
|
@ -65,6 +67,10 @@ from ._root import (
|
|||
open_root_actor as open_root_actor,
|
||||
)
|
||||
from .ipc import Channel as Channel
|
||||
from ._portal import Portal as Portal
|
||||
from ._runtime import Actor as Actor
|
||||
from .runtime._portal import Portal as Portal
|
||||
from .runtime._runtime import Actor as Actor
|
||||
from .discovery._registry import (
|
||||
Registrar as Registrar,
|
||||
Arbiter as Arbiter,
|
||||
)
|
||||
# from . import hilevel as hilevel
|
||||
|
|
|
|||
|
|
@ -22,8 +22,8 @@ import argparse
|
|||
|
||||
from ast import literal_eval
|
||||
|
||||
from ._runtime import Actor
|
||||
from ._entry import _trio_main
|
||||
from .runtime._runtime import Actor
|
||||
from .spawn._entry import _trio_main
|
||||
|
||||
|
||||
def parse_uid(arg):
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ from ._streaming import (
|
|||
MsgStream,
|
||||
open_stream_from_ctx,
|
||||
)
|
||||
from ._state import (
|
||||
from .runtime._state import (
|
||||
current_actor,
|
||||
debug_mode,
|
||||
_ctxvar_Context,
|
||||
|
|
@ -107,8 +107,8 @@ from .trionics import (
|
|||
)
|
||||
# ------ - ------
|
||||
if TYPE_CHECKING:
|
||||
from ._portal import Portal
|
||||
from ._runtime import Actor
|
||||
from .runtime._portal import Portal
|
||||
from .runtime._runtime import Actor
|
||||
from .ipc._transport import MsgTransport
|
||||
from .devx._frame_stack import (
|
||||
CallerInfo,
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ from msgspec import (
|
|||
ValidationError,
|
||||
)
|
||||
|
||||
from tractor._state import current_actor
|
||||
from tractor.runtime._state import current_actor
|
||||
from tractor.log import get_logger
|
||||
from tractor.msg import (
|
||||
Error,
|
||||
|
|
@ -187,7 +187,31 @@ _body_fields: list[str] = list(
|
|||
)
|
||||
|
||||
|
||||
def get_err_type(type_name: str) -> BaseException|None:
|
||||
def reg_err_types(
|
||||
exc_types: list[Type[Exception]],
|
||||
) -> None:
|
||||
'''
|
||||
Register custom exception types for local lookup.
|
||||
|
||||
Such that error types can be registered by an external
|
||||
`tractor`-use-app code base which are expected to be raised
|
||||
remotely; enables them being re-raised on the receiver side of
|
||||
some inter-actor IPC dialog.
|
||||
|
||||
'''
|
||||
for exc_type in exc_types:
|
||||
log.debug(
|
||||
f'Register custom exception,\n'
|
||||
f'{exc_type!r}\n'
|
||||
)
|
||||
setattr(
|
||||
_this_mod,
|
||||
exc_type.__name__,
|
||||
exc_type,
|
||||
)
|
||||
|
||||
|
||||
def get_err_type(type_name: str) -> Type[BaseException]|None:
|
||||
'''
|
||||
Look up an exception type by name from the set of locally known
|
||||
namespaces:
|
||||
|
|
@ -301,7 +325,8 @@ class RemoteActorError(Exception):
|
|||
# also pertains to our long long oustanding issue XD
|
||||
# https://github.com/goodboy/tractor/issues/5
|
||||
self._boxed_type: BaseException = boxed_type
|
||||
self._src_type: BaseException|None = None
|
||||
self._src_type: Type[BaseException]|None = None
|
||||
self._src_type_resolved: bool = False
|
||||
self._ipc_msg: Error|None = ipc_msg
|
||||
self._extra_msgdata = extra_msgdata
|
||||
|
||||
|
|
@ -410,24 +435,41 @@ class RemoteActorError(Exception):
|
|||
return self._ipc_msg.src_type_str
|
||||
|
||||
@property
|
||||
def src_type(self) -> str:
|
||||
def src_type(self) -> Type[BaseException]|None:
|
||||
'''
|
||||
Error type raised by original remote faulting actor.
|
||||
Error type raised by original remote faulting
|
||||
actor.
|
||||
|
||||
When the error has only been relayed a single actor-hop
|
||||
this will be the same as the `.boxed_type`.
|
||||
When the error has only been relayed a single
|
||||
actor-hop this will be the same as
|
||||
`.boxed_type`.
|
||||
|
||||
If the type can not be resolved locally (i.e.
|
||||
it was not registered via `reg_err_types()`)
|
||||
a warning is logged and `None` is returned;
|
||||
all string-level error info (`.src_type_str`,
|
||||
`.tb_str`, etc.) remains available.
|
||||
|
||||
'''
|
||||
if self._src_type is None:
|
||||
if not self._src_type_resolved:
|
||||
self._src_type_resolved = True
|
||||
|
||||
if self._ipc_msg is None:
|
||||
return None
|
||||
|
||||
self._src_type = get_err_type(
|
||||
self._ipc_msg.src_type_str
|
||||
)
|
||||
|
||||
if not self._src_type:
|
||||
raise TypeError(
|
||||
f'Failed to lookup src error type with '
|
||||
f'`tractor._exceptions.get_err_type()` :\n'
|
||||
f'{self.src_type_str}'
|
||||
log.warning(
|
||||
f'Failed to lookup src error type via\n'
|
||||
f'`tractor._exceptions.get_err_type()`:\n'
|
||||
f'\n'
|
||||
f'`{self._ipc_msg.src_type_str}`'
|
||||
f' is not registered!\n'
|
||||
f'\n'
|
||||
f'Call `reg_err_types()` to enable'
|
||||
f' full type reconstruction.\n'
|
||||
)
|
||||
|
||||
return self._src_type
|
||||
|
|
@ -435,20 +477,30 @@ class RemoteActorError(Exception):
|
|||
@property
|
||||
def boxed_type_str(self) -> str:
|
||||
'''
|
||||
String-name of the (last hop's) boxed error type.
|
||||
String-name of the (last hop's) boxed error
|
||||
type.
|
||||
|
||||
Falls back to the IPC-msg-encoded type-name
|
||||
str when the type can not be resolved locally
|
||||
(e.g. unregistered custom errors).
|
||||
|
||||
'''
|
||||
# TODO, maybe support also serializing the
|
||||
# `ExceptionGroup.exeptions: list[BaseException]` set under
|
||||
# certain conditions?
|
||||
# `ExceptionGroup.exceptions: list[BaseException]`
|
||||
# set under certain conditions?
|
||||
bt: Type[BaseException] = self.boxed_type
|
||||
if bt:
|
||||
return str(bt.__name__)
|
||||
|
||||
return ''
|
||||
# fallback to the str name from the IPC msg
|
||||
# when the type obj can't be resolved.
|
||||
if self._ipc_msg:
|
||||
return self._ipc_msg.boxed_type_str
|
||||
|
||||
return '<unknown>'
|
||||
|
||||
@property
|
||||
def boxed_type(self) -> Type[BaseException]:
|
||||
def boxed_type(self) -> Type[BaseException]|None:
|
||||
'''
|
||||
Error type boxed by last actor IPC hop.
|
||||
|
||||
|
|
@ -677,10 +729,22 @@ class RemoteActorError(Exception):
|
|||
failing actor's remote env.
|
||||
|
||||
'''
|
||||
# TODO: better tb insertion and all the fancier dunder
|
||||
# metadata stuff as per `.__context__` etc. and friends:
|
||||
# TODO: better tb insertion and all the fancier
|
||||
# dunder metadata stuff as per `.__context__`
|
||||
# etc. and friends:
|
||||
# https://github.com/python-trio/trio/issues/611
|
||||
src_type_ref: Type[BaseException] = self.src_type
|
||||
src_type_ref: Type[BaseException]|None = (
|
||||
self.src_type
|
||||
)
|
||||
if src_type_ref is None:
|
||||
# unresolvable type: fall back to
|
||||
# a `RuntimeError` preserving original
|
||||
# traceback + type name.
|
||||
return RuntimeError(
|
||||
f'{self.src_type_str}: '
|
||||
f'{self.tb_str}'
|
||||
)
|
||||
|
||||
return src_type_ref(self.tb_str)
|
||||
|
||||
# TODO: local recontruction of nested inception for a given
|
||||
|
|
@ -1209,14 +1273,31 @@ def unpack_error(
|
|||
if not isinstance(msg, Error):
|
||||
return None
|
||||
|
||||
# try to lookup a suitable error type from the local runtime
|
||||
# env then use it to construct a local instance.
|
||||
# boxed_type_str: str = error_dict['boxed_type_str']
|
||||
# try to lookup a suitable error type from the
|
||||
# local runtime env then use it to construct a
|
||||
# local instance.
|
||||
boxed_type_str: str = msg.boxed_type_str
|
||||
boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
|
||||
boxed_type: Type[BaseException]|None = get_err_type(
|
||||
boxed_type_str
|
||||
)
|
||||
|
||||
# retrieve the error's msg-encoded remotoe-env info
|
||||
message: str = f'remote task raised a {msg.boxed_type_str!r}\n'
|
||||
if boxed_type is None:
|
||||
log.warning(
|
||||
f'Failed to resolve remote error type\n'
|
||||
f'`{boxed_type_str}` - boxing as\n'
|
||||
f'`RemoteActorError` with original\n'
|
||||
f'traceback preserved.\n'
|
||||
f'\n'
|
||||
f'Call `reg_err_types()` to enable\n'
|
||||
f'full type reconstruction.\n'
|
||||
)
|
||||
|
||||
# retrieve the error's msg-encoded remote-env
|
||||
# info
|
||||
message: str = (
|
||||
f'remote task raised a '
|
||||
f'{msg.boxed_type_str!r}\n'
|
||||
)
|
||||
|
||||
# TODO: do we even really need these checks for RAEs?
|
||||
if boxed_type_str in [
|
||||
|
|
|
|||
|
|
@ -37,19 +37,20 @@ import warnings
|
|||
|
||||
import trio
|
||||
|
||||
from . import _runtime
|
||||
from .runtime import _runtime
|
||||
from .discovery._registry import Registrar
|
||||
from .devx import (
|
||||
debug,
|
||||
_frame_stack,
|
||||
pformat as _pformat,
|
||||
)
|
||||
from . import _spawn
|
||||
from . import _state
|
||||
from .spawn import _spawn
|
||||
from .runtime import _state
|
||||
from . import log
|
||||
from .ipc import (
|
||||
_connect_chan,
|
||||
)
|
||||
from ._addr import (
|
||||
from .discovery._addr import (
|
||||
Address,
|
||||
UnwrappedAddress,
|
||||
default_lo_addrs,
|
||||
|
|
@ -267,7 +268,6 @@ async def open_root_actor(
|
|||
if start_method is not None:
|
||||
_spawn.try_set_start_method(start_method)
|
||||
|
||||
# TODO! remove this ASAP!
|
||||
if arbiter_addr is not None:
|
||||
warnings.warn(
|
||||
'`arbiter_addr` is now deprecated\n'
|
||||
|
|
@ -400,7 +400,7 @@ async def open_root_actor(
|
|||
'registry socket(s) already bound'
|
||||
)
|
||||
|
||||
# we were able to connect to an arbiter
|
||||
# we were able to connect to a registrar
|
||||
logger.info(
|
||||
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
|
||||
)
|
||||
|
|
@ -453,8 +453,7 @@ async def open_root_actor(
|
|||
# https://github.com/goodboy/tractor/pull/348
|
||||
# https://github.com/goodboy/tractor/issues/296
|
||||
|
||||
# TODO: rename as `RootActor` or is that even necessary?
|
||||
actor = _runtime.Arbiter(
|
||||
actor = Registrar(
|
||||
name=name or 'registrar',
|
||||
uuid=mk_uuid(),
|
||||
registry_addrs=registry_addrs,
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ from tractor.msg import (
|
|||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
from .runtime._runtime import Actor
|
||||
from ._context import Context
|
||||
from .ipc import Channel
|
||||
|
||||
|
|
|
|||
|
|
@ -26,9 +26,7 @@ import random
|
|||
from typing import (
|
||||
Type,
|
||||
)
|
||||
from tractor import (
|
||||
_addr,
|
||||
)
|
||||
from tractor.discovery import _addr
|
||||
|
||||
|
||||
def get_rando_addr(
|
||||
|
|
|
|||
|
|
@ -21,17 +21,27 @@ and applications.
|
|||
'''
|
||||
from functools import (
|
||||
partial,
|
||||
wraps,
|
||||
)
|
||||
import inspect
|
||||
import platform
|
||||
from typing import (
|
||||
Callable,
|
||||
Type,
|
||||
)
|
||||
|
||||
import pytest
|
||||
import tractor
|
||||
import trio
|
||||
import wrapt
|
||||
|
||||
|
||||
def tractor_test(fn):
|
||||
def tractor_test(
|
||||
wrapped: Callable|None = None,
|
||||
*,
|
||||
# @tractor_test(<deco-params>)
|
||||
timeout:float = 30,
|
||||
hide_tb: bool = True,
|
||||
):
|
||||
'''
|
||||
Decorator for async test fns to decorator-wrap them as "native"
|
||||
looking sync funcs runnable by `pytest` and auto invoked with
|
||||
|
|
@ -45,8 +55,18 @@ def tractor_test(fn):
|
|||
Basic deco use:
|
||||
---------------
|
||||
|
||||
@tractor_test
|
||||
async def test_whatever():
|
||||
@tractor_test(
|
||||
timeout=10,
|
||||
)
|
||||
async def test_whatever(
|
||||
# fixture param declarations
|
||||
loglevel: str,
|
||||
start_method: str,
|
||||
reg_addr: tuple,
|
||||
tpt_proto: str,
|
||||
debug_mode: bool,
|
||||
):
|
||||
# already inside a root-actor runtime `trio.Task`
|
||||
await ...
|
||||
|
||||
|
||||
|
|
@ -55,7 +75,7 @@ def tractor_test(fn):
|
|||
If any of the following fixture are requested by the wrapped test
|
||||
fn (via normal func-args declaration),
|
||||
|
||||
- `reg_addr` (a socket addr tuple where arbiter is listening)
|
||||
- `reg_addr` (a socket addr tuple where registrar is listening)
|
||||
- `loglevel` (logging level passed to tractor internals)
|
||||
- `start_method` (subprocess spawning backend)
|
||||
|
||||
|
|
@ -67,52 +87,69 @@ def tractor_test(fn):
|
|||
`tractor.open_root_actor()` funcargs.
|
||||
|
||||
'''
|
||||
@wraps(fn)
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
# handle the decorator not called with () case.
|
||||
# i.e. in `wrapt` support a deco-with-optional-args,
|
||||
# https://wrapt.readthedocs.io/en/master/decorators.html#decorators-with-optional-arguments
|
||||
if wrapped is None:
|
||||
return wrapt.PartialCallableObjectProxy(
|
||||
tractor_test,
|
||||
timeout=timeout,
|
||||
hide_tb=hide_tb
|
||||
)
|
||||
|
||||
@wrapt.decorator
|
||||
def wrapper(
|
||||
*args,
|
||||
loglevel=None,
|
||||
reg_addr=None,
|
||||
start_method: str|None = None,
|
||||
debug_mode: bool = False,
|
||||
tpt_proto: str|None=None,
|
||||
**kwargs
|
||||
wrapped: Callable,
|
||||
instance: object|Type|None,
|
||||
args: tuple,
|
||||
kwargs: dict,
|
||||
):
|
||||
# __tracebackhide__ = True
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
# NOTE: inject ant test func declared fixture
|
||||
# names by manually checking!
|
||||
if 'reg_addr' in inspect.signature(fn).parameters:
|
||||
# injects test suite fixture value to test as well
|
||||
# as `run()`
|
||||
kwargs['reg_addr'] = reg_addr
|
||||
# NOTE, ensure we inject any test-fn declared fixture names.
|
||||
for kw in [
|
||||
'reg_addr',
|
||||
'loglevel',
|
||||
'start_method',
|
||||
'debug_mode',
|
||||
'tpt_proto',
|
||||
'timeout',
|
||||
]:
|
||||
if kw in inspect.signature(wrapped).parameters:
|
||||
assert kw in kwargs
|
||||
|
||||
if 'loglevel' in inspect.signature(fn).parameters:
|
||||
# allows test suites to define a 'loglevel' fixture
|
||||
# that activates the internal logging
|
||||
kwargs['loglevel'] = loglevel
|
||||
start_method = kwargs.get('start_method')
|
||||
if platform.system() == "Windows":
|
||||
if start_method is None:
|
||||
kwargs['start_method'] = 'trio'
|
||||
elif start_method != 'trio':
|
||||
raise ValueError(
|
||||
'ONLY the `start_method="trio"` is supported on Windows.'
|
||||
)
|
||||
|
||||
if start_method is None:
|
||||
if platform.system() == "Windows":
|
||||
start_method = 'trio'
|
||||
# open a root-actor, passing certain
|
||||
# `tractor`-runtime-settings, then invoke the test-fn body as
|
||||
# the root-most task.
|
||||
#
|
||||
# https://wrapt.readthedocs.io/en/master/decorators.html#processing-function-arguments
|
||||
async def _main(
|
||||
*args,
|
||||
|
||||
if 'start_method' in inspect.signature(fn).parameters:
|
||||
# set of subprocess spawning backends
|
||||
kwargs['start_method'] = start_method
|
||||
# runtime-settings
|
||||
loglevel:str|None = None,
|
||||
reg_addr:tuple|None = None,
|
||||
start_method: str|None = None,
|
||||
debug_mode: bool = False,
|
||||
tpt_proto: str|None = None,
|
||||
|
||||
if 'debug_mode' in inspect.signature(fn).parameters:
|
||||
# set of subprocess spawning backends
|
||||
kwargs['debug_mode'] = debug_mode
|
||||
**kwargs,
|
||||
):
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
if 'tpt_proto' in inspect.signature(fn).parameters:
|
||||
# set of subprocess spawning backends
|
||||
kwargs['tpt_proto'] = tpt_proto
|
||||
|
||||
if kwargs:
|
||||
|
||||
# use explicit root actor start
|
||||
async def _main():
|
||||
with trio.fail_after(timeout):
|
||||
async with tractor.open_root_actor(
|
||||
# **kwargs,
|
||||
registry_addrs=[reg_addr] if reg_addr else None,
|
||||
loglevel=loglevel,
|
||||
start_method=start_method,
|
||||
|
|
@ -121,17 +158,31 @@ def tractor_test(fn):
|
|||
debug_mode=debug_mode,
|
||||
|
||||
):
|
||||
await fn(*args, **kwargs)
|
||||
# invoke test-fn body IN THIS task
|
||||
await wrapped(
|
||||
*args,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
main = _main
|
||||
funcname = wrapped.__name__
|
||||
if not inspect.iscoroutinefunction(wrapped):
|
||||
raise TypeError(
|
||||
f"Test-fn {funcname!r} must be an async-function !!"
|
||||
)
|
||||
|
||||
else:
|
||||
# use implicit root actor start
|
||||
main = partial(fn, *args, **kwargs)
|
||||
# invoke runtime via a root task.
|
||||
return trio.run(
|
||||
partial(
|
||||
_main,
|
||||
*args,
|
||||
**kwargs,
|
||||
)
|
||||
)
|
||||
|
||||
return trio.run(main)
|
||||
|
||||
return wrapper
|
||||
return wrapper(
|
||||
wrapped,
|
||||
)
|
||||
|
||||
|
||||
def pytest_addoption(
|
||||
|
|
@ -179,7 +230,8 @@ def pytest_addoption(
|
|||
|
||||
def pytest_configure(config):
|
||||
backend = config.option.spawn_backend
|
||||
tractor._spawn.try_set_start_method(backend)
|
||||
from tractor.spawn._spawn import try_set_start_method
|
||||
try_set_start_method(backend)
|
||||
|
||||
# register custom marks to avoid warnings see,
|
||||
# https://docs.pytest.org/en/stable/how-to/writing_plugins.html#registering-custom-markers
|
||||
|
|
@ -225,7 +277,8 @@ def tpt_protos(request) -> list[str]:
|
|||
|
||||
# XXX ensure we support the protocol by name via lookup!
|
||||
for proto_key in proto_keys:
|
||||
addr_type = tractor._addr._address_types[proto_key]
|
||||
from tractor.discovery import _addr
|
||||
addr_type = _addr._address_types[proto_key]
|
||||
assert addr_type.proto_key == proto_key
|
||||
|
||||
yield proto_keys
|
||||
|
|
@ -256,7 +309,7 @@ def tpt_proto(
|
|||
# f'tpt-proto={proto_key!r}\n'
|
||||
# )
|
||||
|
||||
from tractor import _state
|
||||
from tractor.runtime import _state
|
||||
if _state._def_tpt_proto != proto_key:
|
||||
_state._def_tpt_proto = proto_key
|
||||
_state._runtime_vars['_enable_tpts'] = [
|
||||
|
|
|
|||
|
|
@ -45,17 +45,15 @@ from typing import (
|
|||
)
|
||||
|
||||
import trio
|
||||
from tractor import (
|
||||
_state,
|
||||
log as logmod,
|
||||
)
|
||||
from tractor.runtime import _state
|
||||
from tractor import log as logmod
|
||||
from tractor.devx import debug
|
||||
|
||||
log = logmod.get_logger()
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor._spawn import ProcessType
|
||||
from tractor.spawn._spawn import ProcessType
|
||||
from tractor import (
|
||||
Actor,
|
||||
ActorNursery,
|
||||
|
|
|
|||
|
|
@ -53,8 +53,8 @@ import trio
|
|||
from tractor._exceptions import (
|
||||
NoRuntime,
|
||||
)
|
||||
from tractor import _state
|
||||
from tractor._state import (
|
||||
from tractor.runtime import _state
|
||||
from tractor.runtime._state import (
|
||||
current_actor,
|
||||
debug_mode,
|
||||
)
|
||||
|
|
@ -76,7 +76,7 @@ from ._repl import (
|
|||
|
||||
if TYPE_CHECKING:
|
||||
from trio.lowlevel import Task
|
||||
from tractor._runtime import (
|
||||
from tractor.runtime._runtime import (
|
||||
Actor,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ from functools import (
|
|||
import os
|
||||
|
||||
import pdbp
|
||||
from tractor._state import (
|
||||
from tractor.runtime._state import (
|
||||
is_root_process,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ from typing import (
|
|||
)
|
||||
import trio
|
||||
from tractor.log import get_logger
|
||||
from tractor._state import (
|
||||
from tractor.runtime._state import (
|
||||
current_actor,
|
||||
is_root_process,
|
||||
)
|
||||
|
|
@ -44,7 +44,7 @@ if TYPE_CHECKING:
|
|||
from tractor.ipc import (
|
||||
Channel,
|
||||
)
|
||||
from tractor._runtime import (
|
||||
from tractor.runtime._runtime import (
|
||||
Actor,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ from trio.lowlevel import (
|
|||
Task,
|
||||
)
|
||||
from tractor._context import Context
|
||||
from tractor._state import (
|
||||
from tractor.runtime._state import (
|
||||
current_actor,
|
||||
debug_mode,
|
||||
is_root_process,
|
||||
|
|
|
|||
|
|
@ -55,12 +55,12 @@ import tractor
|
|||
from tractor.log import get_logger
|
||||
from tractor.to_asyncio import run_trio_task_in_future
|
||||
from tractor._context import Context
|
||||
from tractor import _state
|
||||
from tractor.runtime import _state
|
||||
from tractor._exceptions import (
|
||||
NoRuntime,
|
||||
InternalError,
|
||||
)
|
||||
from tractor._state import (
|
||||
from tractor.runtime._state import (
|
||||
current_actor,
|
||||
current_ipc_ctx,
|
||||
is_root_process,
|
||||
|
|
@ -87,7 +87,7 @@ from ..pformat import (
|
|||
if TYPE_CHECKING:
|
||||
from trio.lowlevel import Task
|
||||
from threading import Thread
|
||||
from tractor._runtime import (
|
||||
from tractor.runtime._runtime import (
|
||||
Actor,
|
||||
)
|
||||
# from ._post_mortem import BoxedMaybeException
|
||||
|
|
|
|||
|
|
@ -55,12 +55,12 @@ import tractor
|
|||
from tractor.to_asyncio import run_trio_task_in_future
|
||||
from tractor.log import get_logger
|
||||
from tractor._context import Context
|
||||
from tractor import _state
|
||||
from tractor.runtime import _state
|
||||
from tractor._exceptions import (
|
||||
DebugRequestError,
|
||||
InternalError,
|
||||
)
|
||||
from tractor._state import (
|
||||
from tractor.runtime._state import (
|
||||
current_actor,
|
||||
is_root_process,
|
||||
)
|
||||
|
|
@ -71,7 +71,7 @@ if TYPE_CHECKING:
|
|||
from tractor.ipc import (
|
||||
IPCServer,
|
||||
)
|
||||
from tractor._runtime import (
|
||||
from tractor.runtime._runtime import (
|
||||
Actor,
|
||||
)
|
||||
from ._repl import (
|
||||
|
|
@ -1013,7 +1013,7 @@ async def request_root_stdio_lock(
|
|||
DebugStatus.req_task = current_task()
|
||||
req_err: BaseException|None = None
|
||||
try:
|
||||
from tractor._discovery import get_root
|
||||
from tractor.discovery._discovery import get_root
|
||||
# NOTE: we need this to ensure that this task exits
|
||||
# BEFORE the REPl instance raises an error like
|
||||
# `bdb.BdbQuit` directly, OW you get a trio cs stack
|
||||
|
|
|
|||
|
|
@ -0,0 +1,26 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Discovery (protocols) API for automatic addressing
|
||||
and location management of (service) actors.
|
||||
|
||||
NOTE: to avoid circular imports, this ``__init__``
|
||||
does NOT eagerly import submodules. Use direct
|
||||
module paths like ``tractor.discovery._addr`` or
|
||||
``tractor.discovery._discovery`` instead.
|
||||
|
||||
'''
|
||||
|
|
@ -27,15 +27,15 @@ from trio import (
|
|||
SocketListener,
|
||||
)
|
||||
|
||||
from .log import get_logger
|
||||
from ._state import (
|
||||
from ..log import get_logger
|
||||
from ..runtime._state import (
|
||||
_def_tpt_proto,
|
||||
)
|
||||
from .ipc._tcp import TCPAddress
|
||||
from .ipc._uds import UDSAddress
|
||||
from ..ipc._tcp import TCPAddress
|
||||
from ..ipc._uds import UDSAddress
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
from ..runtime._runtime import Actor
|
||||
|
||||
log = get_logger()
|
||||
|
||||
|
|
@ -28,29 +28,29 @@ from typing import (
|
|||
from contextlib import asynccontextmanager as acm
|
||||
|
||||
from tractor.log import get_logger
|
||||
from .trionics import (
|
||||
from ..trionics import (
|
||||
gather_contexts,
|
||||
collapse_eg,
|
||||
)
|
||||
from .ipc import _connect_chan, Channel
|
||||
from ..ipc import _connect_chan, Channel
|
||||
from ._addr import (
|
||||
UnwrappedAddress,
|
||||
Address,
|
||||
wrap_address
|
||||
)
|
||||
from ._portal import (
|
||||
from ..runtime._portal import (
|
||||
Portal,
|
||||
open_portal,
|
||||
LocalPortal,
|
||||
)
|
||||
from ._state import (
|
||||
from ..runtime._state import (
|
||||
current_actor,
|
||||
_runtime_vars,
|
||||
_def_tpt_proto,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
from ..runtime._runtime import Actor
|
||||
|
||||
|
||||
log = get_logger()
|
||||
|
|
@ -60,7 +60,7 @@ log = get_logger()
|
|||
async def get_registry(
|
||||
addr: UnwrappedAddress|None = None,
|
||||
) -> AsyncGenerator[
|
||||
Portal | LocalPortal | None,
|
||||
Portal|LocalPortal|None,
|
||||
None,
|
||||
]:
|
||||
'''
|
||||
|
|
@ -72,8 +72,8 @@ async def get_registry(
|
|||
'''
|
||||
actor: Actor = current_actor()
|
||||
if actor.is_registrar:
|
||||
# we're already the arbiter
|
||||
# (likely a re-entrant call from the arbiter actor)
|
||||
# we're already the registrar
|
||||
# (likely a re-entrant call from the registrar actor)
|
||||
yield LocalPortal(
|
||||
actor,
|
||||
Channel(transport=None)
|
||||
|
|
@ -153,21 +153,27 @@ async def query_actor(
|
|||
regaddr: UnwrappedAddress|None = None,
|
||||
|
||||
) -> AsyncGenerator[
|
||||
UnwrappedAddress|None,
|
||||
tuple[UnwrappedAddress|None, Portal|LocalPortal|None],
|
||||
None,
|
||||
]:
|
||||
'''
|
||||
Lookup a transport address (by actor name) via querying a registrar
|
||||
listening @ `regaddr`.
|
||||
|
||||
Returns the transport protocol (socket) address or `None` if no
|
||||
entry under that name exists.
|
||||
Yields a `tuple` of `(addr, reg_portal)` where,
|
||||
- `addr` is the transport protocol (socket) address or `None` if
|
||||
no entry under that name exists,
|
||||
- `reg_portal` is the `Portal` (or `LocalPortal` when the
|
||||
current actor is the registrar) used for the lookup (or
|
||||
`None` when the peer was found locally via
|
||||
`get_peer_by_name()`).
|
||||
|
||||
'''
|
||||
actor: Actor = current_actor()
|
||||
if (
|
||||
name == 'registrar'
|
||||
and actor.is_registrar
|
||||
and
|
||||
actor.is_registrar
|
||||
):
|
||||
raise RuntimeError(
|
||||
'The current actor IS the registry!?'
|
||||
|
|
@ -175,10 +181,10 @@ async def query_actor(
|
|||
|
||||
maybe_peers: list[Channel]|None = get_peer_by_name(name)
|
||||
if maybe_peers:
|
||||
yield maybe_peers[0].raddr
|
||||
yield maybe_peers[0].raddr, None
|
||||
return
|
||||
|
||||
reg_portal: Portal
|
||||
reg_portal: Portal|LocalPortal
|
||||
regaddr: Address = wrap_address(regaddr) or actor.reg_addrs[0]
|
||||
async with get_registry(regaddr) as reg_portal:
|
||||
# TODO: return portals to all available actors - for now
|
||||
|
|
@ -188,8 +194,7 @@ async def query_actor(
|
|||
'find_actor',
|
||||
name=name,
|
||||
)
|
||||
yield addr
|
||||
|
||||
yield addr, reg_portal
|
||||
|
||||
@acm
|
||||
async def maybe_open_portal(
|
||||
|
|
@ -204,15 +209,49 @@ async def maybe_open_portal(
|
|||
async with query_actor(
|
||||
name=name,
|
||||
regaddr=addr,
|
||||
) as addr:
|
||||
pass
|
||||
) as (addr, reg_portal):
|
||||
if not addr:
|
||||
yield None
|
||||
return
|
||||
|
||||
if addr:
|
||||
async with _connect_chan(addr) as chan:
|
||||
async with open_portal(chan) as portal:
|
||||
yield portal
|
||||
else:
|
||||
yield None
|
||||
try:
|
||||
async with _connect_chan(addr) as chan:
|
||||
async with open_portal(chan) as portal:
|
||||
yield portal
|
||||
|
||||
# most likely we were unable to connect the
|
||||
# transport and there is likely a stale entry in
|
||||
# the registry actor's table, thus we need to
|
||||
# instruct it to clear that stale entry and then
|
||||
# more silently (pretend there was no reason but
|
||||
# to) indicate that the target actor can't be
|
||||
# contacted at that addr.
|
||||
except OSError:
|
||||
# NOTE: ensure we delete the stale entry
|
||||
# from the registrar actor when available.
|
||||
if reg_portal is not None:
|
||||
uid: tuple[str, str]|None = await reg_portal.run_from_ns(
|
||||
'self',
|
||||
'delete_addr',
|
||||
addr=addr,
|
||||
)
|
||||
if uid:
|
||||
log.warning(
|
||||
f'Deleted stale registry entry !\n'
|
||||
f'addr: {addr!r}\n'
|
||||
f'uid: {uid!r}\n'
|
||||
)
|
||||
else:
|
||||
log.warning(
|
||||
f'No registry entry found for addr: {addr!r}'
|
||||
)
|
||||
else:
|
||||
log.warning(
|
||||
f'Connection to {addr!r} failed'
|
||||
f' and no registry portal available'
|
||||
f' to delete stale entry.'
|
||||
)
|
||||
yield None
|
||||
|
||||
|
||||
@acm
|
||||
|
|
@ -229,10 +268,10 @@ async def find_actor(
|
|||
None,
|
||||
]:
|
||||
'''
|
||||
Ask the arbiter to find actor(s) by name.
|
||||
Ask the registrar to find actor(s) by name.
|
||||
|
||||
Returns a connected portal to the last registered matching actor
|
||||
known to the arbiter.
|
||||
Returns a connected portal to the last registered
|
||||
matching actor known to the registrar.
|
||||
|
||||
'''
|
||||
# optimization path, use any pre-existing peer channel
|
||||
|
|
@ -280,7 +319,7 @@ async def find_actor(
|
|||
if not any(portals):
|
||||
if raise_on_none:
|
||||
raise RuntimeError(
|
||||
f'No actor "{name}" found registered @ {registry_addrs}'
|
||||
f'No actor {name!r} found registered @ {registry_addrs!r}'
|
||||
)
|
||||
yield None
|
||||
return
|
||||
|
|
@ -0,0 +1,253 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Affero General Public
|
||||
# License as published by the Free Software Foundation, either
|
||||
# version 3 of the License, or (at your option) any later
|
||||
# version.
|
||||
|
||||
# This program is distributed in the hope that it will be
|
||||
# useful, but WITHOUT ANY WARRANTY; without even the implied
|
||||
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
|
||||
# PURPOSE. See the GNU Affero General Public License for more
|
||||
# details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General
|
||||
# Public License along with this program. If not, see
|
||||
# <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Actor-registry for process-tree service discovery.
|
||||
|
||||
The `Registrar` is a special `Actor` subtype that serves as
|
||||
the process-tree's name-registry, tracking actor
|
||||
name-to-address mappings so peers can discover each other.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
||||
from bidict import bidict
|
||||
import trio
|
||||
|
||||
from ..runtime._runtime import Actor
|
||||
from ._addr import (
|
||||
UnwrappedAddress,
|
||||
Address,
|
||||
wrap_address,
|
||||
)
|
||||
from ..devx import debug
|
||||
from ..log import get_logger
|
||||
|
||||
|
||||
log = get_logger('tractor')
|
||||
|
||||
|
||||
class Registrar(Actor):
|
||||
'''
|
||||
A special registrar `Actor` who can contact all other
|
||||
actors within its immediate process tree and keeps
|
||||
a registry of others meant to be discoverable in
|
||||
a distributed application.
|
||||
|
||||
Normally the registrar is also the "root actor" and
|
||||
thus always has access to the top-most-level actor
|
||||
(process) nursery.
|
||||
|
||||
By default, the registrar is always initialized when
|
||||
and if no other registrar socket addrs have been
|
||||
specified to runtime init entry-points (such as
|
||||
`open_root_actor()` or `open_nursery()`). Any time
|
||||
a new main process is launched (and thus a new root
|
||||
actor created) and, no existing registrar can be
|
||||
contacted at the provided `registry_addr`, then
|
||||
a new one is always created; however, if one can be
|
||||
reached it is used.
|
||||
|
||||
Normally a distributed app requires at least one
|
||||
registrar per logical host where for that given
|
||||
"host space" (aka localhost IPC domain of addresses)
|
||||
it is responsible for making all other host (local
|
||||
address) bound actors *discoverable* to external
|
||||
actor trees running on remote hosts.
|
||||
|
||||
'''
|
||||
is_registrar = True
|
||||
|
||||
def is_registry(self) -> bool:
|
||||
return self.is_registrar
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*args,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
|
||||
self._registry: bidict[
|
||||
tuple[str, str],
|
||||
UnwrappedAddress,
|
||||
] = bidict({})
|
||||
self._waiters: dict[
|
||||
str,
|
||||
# either an event to sync to receiving an
|
||||
# actor uid (which is filled in once the actor
|
||||
# has sucessfully registered), or that uid
|
||||
# after registry is complete.
|
||||
list[trio.Event|tuple[str, str]]
|
||||
] = {}
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
async def find_actor(
|
||||
self,
|
||||
name: str,
|
||||
|
||||
) -> UnwrappedAddress|None:
|
||||
|
||||
for uid, addr in self._registry.items():
|
||||
if name in uid:
|
||||
return addr
|
||||
|
||||
return None
|
||||
|
||||
async def get_registry(
|
||||
self
|
||||
|
||||
) -> dict[str, UnwrappedAddress]:
|
||||
'''
|
||||
Return current name registry.
|
||||
|
||||
This method is async to allow for cross-actor
|
||||
invocation.
|
||||
|
||||
'''
|
||||
# NOTE: requires ``strict_map_key=False`` to the
|
||||
# msgpack unpacker since we have tuples as keys
|
||||
# (note this makes the registrar suscetible to
|
||||
# hashdos):
|
||||
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
|
||||
return {
|
||||
'.'.join(key): val
|
||||
for key, val in self._registry.items()
|
||||
}
|
||||
|
||||
async def wait_for_actor(
|
||||
self,
|
||||
name: str,
|
||||
|
||||
) -> list[UnwrappedAddress]:
|
||||
'''
|
||||
Wait for a particular actor to register.
|
||||
|
||||
This is a blocking call if no actor by the
|
||||
provided name is currently registered.
|
||||
|
||||
'''
|
||||
addrs: list[UnwrappedAddress] = []
|
||||
addr: UnwrappedAddress
|
||||
|
||||
mailbox_info: str = (
|
||||
'Actor registry contact infos:\n'
|
||||
)
|
||||
for uid, addr in self._registry.items():
|
||||
mailbox_info += (
|
||||
f'|_uid: {uid}\n'
|
||||
f'|_addr: {addr}\n\n'
|
||||
)
|
||||
if name == uid[0]:
|
||||
addrs.append(addr)
|
||||
|
||||
if not addrs:
|
||||
waiter = trio.Event()
|
||||
self._waiters.setdefault(
|
||||
name, []
|
||||
).append(waiter)
|
||||
await waiter.wait()
|
||||
|
||||
for uid in self._waiters[name]:
|
||||
if not isinstance(uid, trio.Event):
|
||||
addrs.append(
|
||||
self._registry[uid]
|
||||
)
|
||||
|
||||
log.runtime(mailbox_info)
|
||||
return addrs
|
||||
|
||||
async def register_actor(
|
||||
self,
|
||||
uid: tuple[str, str],
|
||||
addr: UnwrappedAddress
|
||||
) -> None:
|
||||
uid = name, hash = (
|
||||
str(uid[0]),
|
||||
str(uid[1]),
|
||||
)
|
||||
waddr: Address = wrap_address(addr)
|
||||
if not waddr.is_valid:
|
||||
# should never be 0-dynamic-os-alloc
|
||||
await debug.pause()
|
||||
|
||||
# XXX NOTE, value must also be hashable AND since
|
||||
# `._registry` is a `bidict` values must be unique;
|
||||
# use `.forceput()` to replace any prior (stale)
|
||||
# entries that might map a different uid to the same
|
||||
# addr (e.g. after an unclean shutdown or
|
||||
# actor-restart reusing the same address).
|
||||
self._registry.forceput(uid, tuple(addr))
|
||||
|
||||
# pop and signal all waiter events
|
||||
events = self._waiters.pop(name, [])
|
||||
self._waiters.setdefault(
|
||||
name, []
|
||||
).append(uid)
|
||||
for event in events:
|
||||
if isinstance(event, trio.Event):
|
||||
event.set()
|
||||
|
||||
async def unregister_actor(
|
||||
self,
|
||||
uid: tuple[str, str]
|
||||
|
||||
) -> None:
|
||||
uid = (str(uid[0]), str(uid[1]))
|
||||
entry: tuple = self._registry.pop(
|
||||
uid, None
|
||||
)
|
||||
if entry is None:
|
||||
log.warning(
|
||||
f'Request to de-register'
|
||||
f' {uid!r} failed?'
|
||||
)
|
||||
|
||||
async def delete_addr(
|
||||
self,
|
||||
addr: tuple[str, int|str]|list[str|int],
|
||||
) -> tuple[str, str]|None:
|
||||
# NOTE: `addr` arrives as a `list` over IPC
|
||||
# (msgpack deserializes tuples -> lists) so
|
||||
# coerce to `tuple` for the bidict hash lookup.
|
||||
uid: tuple[str, str]|None = (
|
||||
self._registry.inverse.pop(
|
||||
tuple(addr),
|
||||
None,
|
||||
)
|
||||
)
|
||||
if uid:
|
||||
report: str = (
|
||||
'Deleting registry-entry for,\n'
|
||||
)
|
||||
else:
|
||||
report: str = (
|
||||
'No registry entry for,\n'
|
||||
)
|
||||
|
||||
log.warning(
|
||||
report
|
||||
+
|
||||
f'{addr!r}@{uid!r}'
|
||||
)
|
||||
return uid
|
||||
|
||||
|
||||
# Backward compat alias
|
||||
Arbiter = Registrar
|
||||
|
|
@ -146,7 +146,7 @@ _pubtask2lock: dict[str, trio.StrictFIFOLock] = {}
|
|||
|
||||
|
||||
def pub(
|
||||
wrapped: typing.Callable | None = None,
|
||||
wrapped: typing.Callable|None = None,
|
||||
*,
|
||||
tasks: set[str] = set(),
|
||||
):
|
||||
|
|
@ -244,8 +244,12 @@ def pub(
|
|||
task2lock[name] = trio.StrictFIFOLock()
|
||||
|
||||
@wrapt.decorator
|
||||
async def wrapper(agen, instance, args, kwargs):
|
||||
|
||||
async def wrapper(
|
||||
agen,
|
||||
instance,
|
||||
args,
|
||||
kwargs,
|
||||
):
|
||||
# XXX: this is used to extract arguments properly as per the
|
||||
# `wrapt` docs
|
||||
async def _execute(
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ from ._types import (
|
|||
transport_from_addr,
|
||||
transport_from_stream,
|
||||
)
|
||||
from tractor._addr import (
|
||||
from tractor.discovery._addr import (
|
||||
is_wrapped_addr,
|
||||
wrap_address,
|
||||
Address,
|
||||
|
|
|
|||
|
|
@ -50,26 +50,24 @@ from ..devx.pformat import (
|
|||
from .._exceptions import (
|
||||
TransportClosed,
|
||||
)
|
||||
from .. import _rpc
|
||||
from ..runtime import _rpc
|
||||
from ..msg import (
|
||||
MsgType,
|
||||
Struct,
|
||||
types as msgtypes,
|
||||
)
|
||||
from ..trionics import maybe_open_nursery
|
||||
from .. import (
|
||||
_state,
|
||||
log,
|
||||
)
|
||||
from .._addr import Address
|
||||
from ..runtime import _state
|
||||
from .. import log
|
||||
from ..discovery._addr import Address
|
||||
from ._chan import Channel
|
||||
from ._transport import MsgTransport
|
||||
from ._uds import UDSAddress
|
||||
from ._tcp import TCPAddress
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .._runtime import Actor
|
||||
from .._supervise import ActorNursery
|
||||
from ..runtime._runtime import Actor
|
||||
from ..runtime._supervise import ActorNursery
|
||||
|
||||
|
||||
log = log.get_logger()
|
||||
|
|
@ -357,7 +355,7 @@ async def handle_stream_from_peer(
|
|||
# and `MsgpackStream._inter_packets()` on a read from the
|
||||
# stream particularly when the runtime is first starting up
|
||||
# inside `open_root_actor()` where there is a check for
|
||||
# a bound listener on the "arbiter" addr. the reset will be
|
||||
# a bound listener on the registrar addr. the reset will be
|
||||
# because the handshake was never meant took place.
|
||||
log.runtime(
|
||||
con_status
|
||||
|
|
@ -970,7 +968,7 @@ class Server(Struct):
|
|||
in `accept_addrs`.
|
||||
|
||||
'''
|
||||
from .._addr import (
|
||||
from ..discovery._addr import (
|
||||
default_lo_addrs,
|
||||
wrap_address,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ from tractor.msg import (
|
|||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor._addr import Address
|
||||
from tractor.discovery._addr import Address
|
||||
|
||||
log = get_logger()
|
||||
|
||||
|
|
@ -225,7 +225,7 @@ class MsgpackTransport(MsgTransport):
|
|||
|
||||
# not sure entirely why we need this but without it we
|
||||
# seem to be getting racy failures here on
|
||||
# arbiter/registry name subs..
|
||||
# registrar name subs..
|
||||
trio.BrokenResourceError,
|
||||
|
||||
) as trans_err:
|
||||
|
|
|
|||
|
|
@ -53,14 +53,14 @@ from tractor.log import get_logger
|
|||
from tractor.ipc._transport import (
|
||||
MsgpackTransport,
|
||||
)
|
||||
from tractor._state import (
|
||||
from tractor.runtime._state import (
|
||||
get_rt_dir,
|
||||
current_actor,
|
||||
is_root_process,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
from tractor.runtime._runtime import Actor
|
||||
|
||||
|
||||
# Platform-specific credential passing constants
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ import colorlog # type: ignore
|
|||
# import colored_traceback.auto # ?TODO, need better config?
|
||||
import trio
|
||||
|
||||
from ._state import current_actor
|
||||
from .runtime._state import current_actor
|
||||
|
||||
|
||||
_default_loglevel: str = 'ERROR'
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ from tractor._exceptions import (
|
|||
_mk_recv_mte,
|
||||
pack_error,
|
||||
)
|
||||
from tractor._state import (
|
||||
from tractor.runtime._state import (
|
||||
current_ipc_ctx,
|
||||
)
|
||||
from ._codec import (
|
||||
|
|
|
|||
|
|
@ -0,0 +1,26 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
The actor runtime: core machinery for the
|
||||
actor-model implemented on a `trio` task runtime.
|
||||
|
||||
NOTE: to avoid circular imports, this ``__init__``
|
||||
does NOT eagerly import submodules. Use direct
|
||||
module paths like ``tractor.runtime._state`` or
|
||||
``tractor.runtime._runtime`` instead.
|
||||
|
||||
'''
|
||||
|
|
@ -39,30 +39,30 @@ import warnings
|
|||
|
||||
import trio
|
||||
|
||||
from .trionics import (
|
||||
from ..trionics import (
|
||||
maybe_open_nursery,
|
||||
collapse_eg,
|
||||
)
|
||||
from ._state import (
|
||||
current_actor,
|
||||
)
|
||||
from .ipc import Channel
|
||||
from .log import get_logger
|
||||
from .msg import (
|
||||
from ..ipc import Channel
|
||||
from ..log import get_logger
|
||||
from ..msg import (
|
||||
# Error,
|
||||
PayloadMsg,
|
||||
NamespacePath,
|
||||
Return,
|
||||
)
|
||||
from ._exceptions import (
|
||||
from .._exceptions import (
|
||||
NoResult,
|
||||
TransportClosed,
|
||||
)
|
||||
from ._context import (
|
||||
from .._context import (
|
||||
Context,
|
||||
open_context_from_portal,
|
||||
)
|
||||
from ._streaming import (
|
||||
from .._streaming import (
|
||||
MsgStream,
|
||||
)
|
||||
|
||||
|
|
@ -43,11 +43,11 @@ from trio import (
|
|||
TaskStatus,
|
||||
)
|
||||
|
||||
from .ipc import Channel
|
||||
from ._context import (
|
||||
from ..ipc import Channel
|
||||
from .._context import (
|
||||
Context,
|
||||
)
|
||||
from ._exceptions import (
|
||||
from .._exceptions import (
|
||||
ContextCancelled,
|
||||
RemoteActorError,
|
||||
ModuleNotExposed,
|
||||
|
|
@ -56,19 +56,19 @@ from ._exceptions import (
|
|||
pack_error,
|
||||
unpack_error,
|
||||
)
|
||||
from .trionics import (
|
||||
from ..trionics import (
|
||||
collapse_eg,
|
||||
is_multi_cancelled,
|
||||
maybe_raise_from_masking_exc,
|
||||
)
|
||||
from .devx import (
|
||||
from ..devx import (
|
||||
debug,
|
||||
add_div,
|
||||
pformat as _pformat,
|
||||
)
|
||||
from . import _state
|
||||
from .log import get_logger
|
||||
from .msg import (
|
||||
from ..log import get_logger
|
||||
from ..msg import (
|
||||
current_codec,
|
||||
MsgCodec,
|
||||
PayloadT,
|
||||
|
|
@ -83,46 +83,46 @@ from tractor.msg import (
|
|||
pretty_struct,
|
||||
types as msgtypes,
|
||||
)
|
||||
from .trionics import (
|
||||
from ..trionics import (
|
||||
collapse_eg,
|
||||
maybe_open_nursery,
|
||||
)
|
||||
from .ipc import (
|
||||
from ..ipc import (
|
||||
Channel,
|
||||
# IPCServer, # causes cycles atm..
|
||||
_server,
|
||||
)
|
||||
from ._addr import (
|
||||
from ..discovery._addr import (
|
||||
UnwrappedAddress,
|
||||
Address,
|
||||
# default_lo_addrs,
|
||||
get_address_cls,
|
||||
wrap_address,
|
||||
)
|
||||
from ._context import (
|
||||
from .._context import (
|
||||
mk_context,
|
||||
Context,
|
||||
)
|
||||
from .log import get_logger
|
||||
from ._exceptions import (
|
||||
from ..log import get_logger
|
||||
from .._exceptions import (
|
||||
ContextCancelled,
|
||||
InternalError,
|
||||
ModuleNotExposed,
|
||||
MsgTypeError,
|
||||
unpack_error,
|
||||
)
|
||||
from .devx import (
|
||||
from ..devx import (
|
||||
debug,
|
||||
pformat as _pformat
|
||||
)
|
||||
from ._discovery import get_registry
|
||||
from ..discovery._discovery import get_registry
|
||||
from ._portal import Portal
|
||||
from . import _state
|
||||
from . import _mp_fixup_main
|
||||
from ..spawn import _mp_fixup_main
|
||||
from . import _rpc
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._supervise import ActorNursery
|
||||
from ._supervise import ActorNursery # noqa
|
||||
from trio._channel import MemoryChannelState
|
||||
|
||||
|
||||
|
|
@ -175,13 +175,21 @@ class Actor:
|
|||
dialog.
|
||||
|
||||
'''
|
||||
# ugh, we need to get rid of this and replace with a "registry" sys
|
||||
# https://github.com/goodboy/tractor/issues/216
|
||||
is_arbiter: bool = False
|
||||
is_registrar: bool = False
|
||||
|
||||
@property
|
||||
def is_registrar(self) -> bool:
|
||||
return self.is_arbiter
|
||||
def is_arbiter(self) -> bool:
|
||||
'''
|
||||
Deprecated, use `.is_registrar`.
|
||||
|
||||
'''
|
||||
warnings.warn(
|
||||
'`Actor.is_arbiter` is deprecated.\n'
|
||||
'Use `.is_registrar` instead.',
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
return self.is_registrar
|
||||
|
||||
@property
|
||||
def is_root(self) -> bool:
|
||||
|
|
@ -237,7 +245,6 @@ class Actor:
|
|||
registry_addrs: list[Address]|None = None,
|
||||
spawn_method: str|None = None,
|
||||
|
||||
# TODO: remove!
|
||||
arbiter_addr: UnwrappedAddress|None = None,
|
||||
|
||||
) -> None:
|
||||
|
|
@ -287,8 +294,8 @@ class Actor:
|
|||
]
|
||||
|
||||
# marked by the process spawning backend at startup
|
||||
# will be None for the parent most process started manually
|
||||
# by the user (currently called the "arbiter")
|
||||
# will be None for the parent most process started
|
||||
# manually by the user (the "registrar")
|
||||
self._spawn_method: str = spawn_method
|
||||
|
||||
# RPC state
|
||||
|
|
@ -907,7 +914,7 @@ class Actor:
|
|||
# TODO! -[ ] another `Struct` for rtvs..
|
||||
rvs: dict[str, Any] = spawnspec._runtime_vars
|
||||
if rvs['_debug_mode']:
|
||||
from .devx import (
|
||||
from ..devx import (
|
||||
enable_stack_on_sig,
|
||||
maybe_init_greenback,
|
||||
)
|
||||
|
|
@ -1656,7 +1663,7 @@ async def async_main(
|
|||
# TODO, just read direct from ipc_server?
|
||||
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
|
||||
|
||||
# Register with the arbiter if we're told its addr
|
||||
# Register with the registrar if we're told its addr
|
||||
log.runtime(
|
||||
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
|
||||
# ^-TODO-^ we should instead show the maddr here^^
|
||||
|
|
@ -1880,153 +1887,8 @@ async def async_main(
|
|||
log.runtime(teardown_report)
|
||||
|
||||
|
||||
# TODO: rename to `Registry` and move to `.discovery._registry`!
|
||||
class Arbiter(Actor):
|
||||
'''
|
||||
A special registrar (and for now..) `Actor` who can contact all
|
||||
other actors within its immediate process tree and possibly keeps
|
||||
a registry of others meant to be discoverable in a distributed
|
||||
application. Normally the registrar is also the "root actor" and
|
||||
thus always has access to the top-most-level actor (process)
|
||||
nursery.
|
||||
|
||||
By default, the registrar is always initialized when and if no
|
||||
other registrar socket addrs have been specified to runtime
|
||||
init entry-points (such as `open_root_actor()` or
|
||||
`open_nursery()`). Any time a new main process is launched (and
|
||||
thus thus a new root actor created) and, no existing registrar
|
||||
can be contacted at the provided `registry_addr`, then a new
|
||||
one is always created; however, if one can be reached it is
|
||||
used.
|
||||
|
||||
Normally a distributed app requires at least registrar per
|
||||
logical host where for that given "host space" (aka localhost
|
||||
IPC domain of addresses) it is responsible for making all other
|
||||
host (local address) bound actors *discoverable* to external
|
||||
actor trees running on remote hosts.
|
||||
|
||||
'''
|
||||
is_arbiter = True
|
||||
|
||||
# TODO, implement this as a read on there existing a `._state` of
|
||||
# some sort setup by whenever we impl this all as
|
||||
# a `.discovery._registry.open_registry()` API
|
||||
def is_registry(self) -> bool:
|
||||
return self.is_arbiter
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*args,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
|
||||
self._registry: dict[
|
||||
tuple[str, str],
|
||||
UnwrappedAddress,
|
||||
] = {}
|
||||
self._waiters: dict[
|
||||
str,
|
||||
# either an event to sync to receiving an actor uid (which
|
||||
# is filled in once the actor has sucessfully registered),
|
||||
# or that uid after registry is complete.
|
||||
list[trio.Event | tuple[str, str]]
|
||||
] = {}
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
async def find_actor(
|
||||
self,
|
||||
name: str,
|
||||
|
||||
) -> UnwrappedAddress|None:
|
||||
|
||||
for uid, addr in self._registry.items():
|
||||
if name in uid:
|
||||
return addr
|
||||
|
||||
return None
|
||||
|
||||
async def get_registry(
|
||||
self
|
||||
|
||||
) -> dict[str, UnwrappedAddress]:
|
||||
'''
|
||||
Return current name registry.
|
||||
|
||||
This method is async to allow for cross-actor invocation.
|
||||
|
||||
'''
|
||||
# NOTE: requires ``strict_map_key=False`` to the msgpack
|
||||
# unpacker since we have tuples as keys (not this makes the
|
||||
# arbiter suscetible to hashdos):
|
||||
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
|
||||
return {
|
||||
'.'.join(key): val
|
||||
for key, val in self._registry.items()
|
||||
}
|
||||
|
||||
async def wait_for_actor(
|
||||
self,
|
||||
name: str,
|
||||
|
||||
) -> list[UnwrappedAddress]:
|
||||
'''
|
||||
Wait for a particular actor to register.
|
||||
|
||||
This is a blocking call if no actor by the provided name is currently
|
||||
registered.
|
||||
|
||||
'''
|
||||
addrs: list[UnwrappedAddress] = []
|
||||
addr: UnwrappedAddress
|
||||
|
||||
mailbox_info: str = 'Actor registry contact infos:\n'
|
||||
for uid, addr in self._registry.items():
|
||||
mailbox_info += (
|
||||
f'|_uid: {uid}\n'
|
||||
f'|_addr: {addr}\n\n'
|
||||
)
|
||||
if name == uid[0]:
|
||||
addrs.append(addr)
|
||||
|
||||
if not addrs:
|
||||
waiter = trio.Event()
|
||||
self._waiters.setdefault(name, []).append(waiter)
|
||||
await waiter.wait()
|
||||
|
||||
for uid in self._waiters[name]:
|
||||
if not isinstance(uid, trio.Event):
|
||||
addrs.append(self._registry[uid])
|
||||
|
||||
log.runtime(mailbox_info)
|
||||
return addrs
|
||||
|
||||
async def register_actor(
|
||||
self,
|
||||
uid: tuple[str, str],
|
||||
addr: UnwrappedAddress
|
||||
) -> None:
|
||||
uid = name, hash = (str(uid[0]), str(uid[1]))
|
||||
waddr: Address = wrap_address(addr)
|
||||
if not waddr.is_valid:
|
||||
# should never be 0-dynamic-os-alloc
|
||||
await debug.pause()
|
||||
|
||||
self._registry[uid] = addr
|
||||
|
||||
# pop and signal all waiter events
|
||||
events = self._waiters.pop(name, [])
|
||||
self._waiters.setdefault(name, []).append(uid)
|
||||
for event in events:
|
||||
if isinstance(event, trio.Event):
|
||||
event.set()
|
||||
|
||||
async def unregister_actor(
|
||||
self,
|
||||
uid: tuple[str, str]
|
||||
|
||||
) -> None:
|
||||
uid = (str(uid[0]), str(uid[1]))
|
||||
entry: tuple = self._registry.pop(uid, None)
|
||||
if entry is None:
|
||||
log.warning(f'Request to de-register {uid} failed?')
|
||||
# Backward compat: class moved to discovery._registry
|
||||
from ..discovery._registry import (
|
||||
Registrar as Registrar,
|
||||
)
|
||||
Arbiter = Registrar
|
||||
|
|
@ -25,6 +25,7 @@ from contextvars import (
|
|||
from pathlib import Path
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Literal,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
|
@ -32,9 +33,14 @@ from typing import (
|
|||
import platformdirs
|
||||
from trio.lowlevel import current_task
|
||||
|
||||
from msgspec import (
|
||||
field,
|
||||
Struct,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
from ._context import Context
|
||||
from .._context import Context
|
||||
|
||||
|
||||
# default IPC transport protocol settings
|
||||
|
|
@ -47,9 +53,70 @@ _def_tpt_proto: TransportProtocolKey = 'tcp'
|
|||
_current_actor: Actor|None = None # type: ignore # noqa
|
||||
_last_actor_terminated: Actor|None = None
|
||||
|
||||
|
||||
# TODO: mk this a `msgspec.Struct`!
|
||||
# -[ ] type out all fields obvi!
|
||||
# -[x] type out all fields obvi!
|
||||
# -[ ] (eventually) mk wire-ready for monitoring?
|
||||
class RuntimeVars(Struct):
|
||||
'''
|
||||
Actor-(and thus process)-global runtime state.
|
||||
|
||||
This struct is relayed from parent to child during sub-actor
|
||||
spawning and is a singleton instance per process.
|
||||
|
||||
Generally contains,
|
||||
- root-actor indicator.
|
||||
- comms-info: addrs for both (public) process/service-discovery
|
||||
and in-tree contact with other actors.
|
||||
- transport-layer IPC protocol server(s) settings.
|
||||
- debug-mode settings for enabling sync breakpointing and any
|
||||
surrounding REPL-fixture hooking.
|
||||
- infected-`asyncio` via guest-mode toggle(s)/cohfig.
|
||||
|
||||
'''
|
||||
_is_root: bool = False # bool
|
||||
_root_mailbox: tuple[str, str|int] = (None, None) # tuple[str|None, str|None]
|
||||
_root_addrs: list[
|
||||
tuple[str, str|int],
|
||||
] = [] # tuple[str|None, str|None]
|
||||
|
||||
# parent->chld ipc protocol caps
|
||||
_enable_tpts: list[TransportProtocolKey] = field(
|
||||
default_factory=lambda: [_def_tpt_proto],
|
||||
)
|
||||
|
||||
# registrar info
|
||||
_registry_addrs: list[tuple] = []
|
||||
|
||||
# `debug_mode: bool` settings
|
||||
_debug_mode: bool = False # bool
|
||||
repl_fixture: bool|Callable = False # |AbstractContextManager[bool]
|
||||
# for `tractor.pause_from_sync()` & `breakpoint()` support
|
||||
use_greenback: bool = False
|
||||
|
||||
# infected-`asyncio`-mode: `trio` running as guest.
|
||||
_is_infected_aio: bool = False
|
||||
|
||||
def __setattr__(
|
||||
self,
|
||||
key,
|
||||
val,
|
||||
) -> None:
|
||||
breakpoint()
|
||||
super().__setattr__(key, val)
|
||||
|
||||
def update(
|
||||
self,
|
||||
from_dict: dict|Struct,
|
||||
) -> None:
|
||||
for attr, val in from_dict.items():
|
||||
setattr(
|
||||
self,
|
||||
attr,
|
||||
val,
|
||||
)
|
||||
|
||||
|
||||
_runtime_vars: dict[str, Any] = {
|
||||
# root of actor-process tree info
|
||||
'_is_root': False, # bool
|
||||
|
|
@ -73,6 +140,23 @@ _runtime_vars: dict[str, Any] = {
|
|||
}
|
||||
|
||||
|
||||
def get_runtime_vars(
|
||||
as_dict: bool = True,
|
||||
) -> dict:
|
||||
'''
|
||||
Deliver a **copy** of the current `Actor`'s "runtime variables".
|
||||
|
||||
By default, for historical impl reasons, this delivers the `dict`
|
||||
form, but the `RuntimeVars` struct should be utilized as possible
|
||||
for future calls.
|
||||
|
||||
'''
|
||||
if as_dict:
|
||||
return dict(_runtime_vars)
|
||||
|
||||
return RuntimeVars(**_runtime_vars)
|
||||
|
||||
|
||||
def last_actor() -> Actor|None:
|
||||
'''
|
||||
Try to return last active `Actor` singleton
|
||||
|
|
@ -98,7 +182,7 @@ def current_actor(
|
|||
_current_actor is None
|
||||
):
|
||||
msg: str = 'No local actor has been initialized yet?\n'
|
||||
from ._exceptions import NoRuntime
|
||||
from .._exceptions import NoRuntime
|
||||
|
||||
if last := last_actor():
|
||||
msg += (
|
||||
|
|
@ -164,7 +248,7 @@ def current_ipc_ctx(
|
|||
not ctx
|
||||
and error_on_not_set
|
||||
):
|
||||
from ._exceptions import InternalError
|
||||
from .._exceptions import InternalError
|
||||
raise InternalError(
|
||||
'No IPC context has been allocated for this task yet?\n'
|
||||
f'|_{current_task()}\n'
|
||||
|
|
@ -30,36 +30,36 @@ import warnings
|
|||
import trio
|
||||
|
||||
|
||||
from .devx import (
|
||||
from ..devx import (
|
||||
debug,
|
||||
pformat as _pformat,
|
||||
)
|
||||
from ._addr import (
|
||||
from ..discovery._addr import (
|
||||
UnwrappedAddress,
|
||||
mk_uuid,
|
||||
)
|
||||
from ._state import current_actor, is_main_process
|
||||
from .log import get_logger, get_loglevel
|
||||
from ..log import get_logger, get_loglevel
|
||||
from ._runtime import Actor
|
||||
from ._portal import Portal
|
||||
from .trionics import (
|
||||
from ..trionics import (
|
||||
is_multi_cancelled,
|
||||
collapse_eg,
|
||||
)
|
||||
from ._exceptions import (
|
||||
from .._exceptions import (
|
||||
ContextCancelled,
|
||||
)
|
||||
from ._root import (
|
||||
from .._root import (
|
||||
open_root_actor,
|
||||
)
|
||||
from . import _state
|
||||
from . import _spawn
|
||||
from ..spawn import _spawn
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import multiprocessing as mp
|
||||
# from .ipc._server import IPCServer
|
||||
from .ipc import IPCServer
|
||||
# from ..ipc._server import IPCServer
|
||||
from ..ipc import IPCServer
|
||||
|
||||
|
||||
log = get_logger()
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Actor process spawning machinery using
|
||||
multiple backends (trio, multiprocessing).
|
||||
|
||||
NOTE: to avoid circular imports, this ``__init__``
|
||||
does NOT eagerly import submodules. Use direct
|
||||
module paths like ``tractor.spawn._spawn`` or
|
||||
``tractor.spawn._entry`` instead.
|
||||
|
||||
'''
|
||||
|
|
@ -29,19 +29,19 @@ from typing import (
|
|||
|
||||
import trio # type: ignore
|
||||
|
||||
from .log import (
|
||||
from ..log import (
|
||||
get_console_log,
|
||||
get_logger,
|
||||
)
|
||||
from . import _state
|
||||
from .devx import (
|
||||
from ..runtime import _state
|
||||
from ..devx import (
|
||||
_frame_stack,
|
||||
pformat,
|
||||
)
|
||||
# from .msg import pretty_struct
|
||||
from .to_asyncio import run_as_asyncio_guest
|
||||
from ._addr import UnwrappedAddress
|
||||
from ._runtime import (
|
||||
# from ..msg import pretty_struct
|
||||
from ..to_asyncio import run_as_asyncio_guest
|
||||
from ..discovery._addr import UnwrappedAddress
|
||||
from ..runtime._runtime import (
|
||||
async_main,
|
||||
Actor,
|
||||
)
|
||||
|
|
@ -125,7 +125,7 @@ class PatchedForkServer(ForkServer):
|
|||
self._forkserver_pid = None
|
||||
|
||||
# XXX only thing that changed!
|
||||
cmd = ('from tractor._forkserver_override import main; ' +
|
||||
cmd = ('from tractor.spawn._forkserver_override import main; ' +
|
||||
'main(%d, %d, %r, **%r)')
|
||||
|
||||
if self._preload_modules:
|
||||
|
|
@ -34,11 +34,11 @@ from typing import (
|
|||
import trio
|
||||
from trio import TaskStatus
|
||||
|
||||
from .devx import (
|
||||
from ..devx import (
|
||||
debug,
|
||||
pformat as _pformat
|
||||
)
|
||||
from tractor._state import (
|
||||
from tractor.runtime._state import (
|
||||
current_actor,
|
||||
is_main_process,
|
||||
is_root_process,
|
||||
|
|
@ -46,10 +46,10 @@ from tractor._state import (
|
|||
_runtime_vars,
|
||||
)
|
||||
from tractor.log import get_logger
|
||||
from tractor._addr import UnwrappedAddress
|
||||
from tractor._portal import Portal
|
||||
from tractor._runtime import Actor
|
||||
from tractor._entry import _mp_main
|
||||
from tractor.discovery._addr import UnwrappedAddress
|
||||
from tractor.runtime._portal import Portal
|
||||
from tractor.runtime._runtime import Actor
|
||||
from ._entry import _mp_main
|
||||
from tractor._exceptions import ActorFailure
|
||||
from tractor.msg import (
|
||||
types as msgtypes,
|
||||
|
|
@ -58,11 +58,11 @@ from tractor.msg import (
|
|||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ipc import (
|
||||
from tractor.ipc import (
|
||||
_server,
|
||||
Channel,
|
||||
)
|
||||
from ._supervise import ActorNursery
|
||||
from tractor.runtime._supervise import ActorNursery
|
||||
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
|
||||
|
||||
|
||||
|
|
@ -43,7 +43,7 @@ from tractor._exceptions import (
|
|||
AsyncioTaskExited,
|
||||
AsyncioCancelled,
|
||||
)
|
||||
from tractor._state import (
|
||||
from tractor.runtime._state import (
|
||||
debug_mode,
|
||||
_runtime_vars,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ from typing import (
|
|||
)
|
||||
|
||||
import trio
|
||||
from tractor._state import current_actor
|
||||
from tractor.runtime._state import current_actor
|
||||
from tractor.log import get_logger
|
||||
from ._tn import maybe_open_nursery
|
||||
# from ._beg import collapse_eg
|
||||
|
|
|
|||
6
uv.lock
6
uv.lock
|
|
@ -273,11 +273,11 @@ wheels = [
|
|||
|
||||
[[package]]
|
||||
name = "pygments"
|
||||
version = "2.19.2"
|
||||
version = "2.20.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/c3/b2/bc9c9196916376152d655522fdcebac55e66de6603a76a02bca1b6414f6c/pygments-2.20.0.tar.gz", hash = "sha256:6757cd03768053ff99f3039c1a36d6c0aa0b263438fcab17520b30a303a82b5f", size = 4955991, upload-time = "2026-03-29T13:29:33.898Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl", hash = "sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176", size = 1231151, upload-time = "2026-03-29T13:29:30.038Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
Loading…
Reference in New Issue