Merge pull request #367 from goodboy/multihomed

Multihomed transport (server) addrs 🕶️
sc_super_proto_dgrams
goodboy 2025-03-20 20:34:13 -04:00 committed by GitHub
commit 3d2b6613e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 983 additions and 434 deletions

8
pytest.ini 100644
View File

@ -0,0 +1,8 @@
# vim: ft=ini
# pytest.ini for tractor
[pytest]
# don't show frickin captured logs AGAIN in the report..
addopts = --show-capture='no'
log_cli = false
; minversion = 6.0

View File

@ -114,12 +114,18 @@ _reg_addr: tuple[str, int] = (
'127.0.0.1',
random.randint(1000, 9999),
)
_arb_addr = _reg_addr
@pytest.fixture(scope='session')
def arb_addr():
return _arb_addr
def reg_addr() -> tuple[str, int]:
# globally override the runtime to the per-test-session-dynamic
# addr so that all tests never conflict with any other actor
# tree using the default.
from tractor import _root
_root._default_lo_addrs = [_reg_addr]
return _reg_addr
def pytest_generate_tests(metafunc):
@ -161,30 +167,35 @@ def sig_prog(proc, sig):
def daemon(
loglevel: str,
testdir,
arb_addr: tuple[str, int],
reg_addr: tuple[str, int],
):
'''
Run a daemon actor as a "remote arbiter".
Run a daemon root actor as a separate actor-process tree and
"remote registrar" for discovery-protocol related tests.
'''
if loglevel in ('trace', 'debug'):
# too much logging will lock up the subproc (smh)
loglevel = 'info'
# XXX: too much logging will lock up the subproc (smh)
loglevel: str = 'info'
cmdargs = [
sys.executable, '-c',
"import tractor; tractor.run_daemon([], registry_addr={}, loglevel={})"
.format(
arb_addr,
"'{}'".format(loglevel) if loglevel else None)
code: str = (
"import tractor; "
"tractor.run_daemon([], registry_addrs={reg_addrs}, loglevel={ll})"
).format(
reg_addrs=str([reg_addr]),
ll="'{}'".format(loglevel) if loglevel else None,
)
cmd: list[str] = [
sys.executable,
'-c', code,
]
kwargs = dict()
kwargs = {}
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc = testdir.popen(
cmdargs,
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,

View File

@ -14,7 +14,7 @@ import tractor
from tractor._testing import (
tractor_test,
)
from .conftest import no_windows
from conftest import no_windows
def is_win():
@ -45,7 +45,7 @@ async def do_nuthin():
],
ids=['no_args', 'unexpected_args'],
)
def test_remote_error(arb_addr, args_err):
def test_remote_error(reg_addr, args_err):
'''
Verify an error raised in a subactor that is propagated
to the parent nursery, contains the underlying boxed builtin
@ -57,7 +57,7 @@ def test_remote_error(arb_addr, args_err):
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:
# on a remote type error caused by bad input args
@ -99,7 +99,7 @@ def test_remote_error(arb_addr, args_err):
assert exc.type == errtype
def test_multierror(arb_addr):
def test_multierror(reg_addr):
'''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors.
@ -107,7 +107,7 @@ def test_multierror(arb_addr):
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:
await nursery.run_in_actor(assert_err, name='errorer1')
@ -132,14 +132,14 @@ def test_multierror(arb_addr):
@pytest.mark.parametrize(
'num_subactors', range(25, 26),
)
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
"""Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors and also with a delay before failure
to test failure during an ongoing spawning.
"""
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:
for i in range(num_subactors):
@ -177,15 +177,20 @@ async def do_nothing():
@pytest.mark.parametrize('mechanism', ['nursery_cancel', KeyboardInterrupt])
def test_cancel_single_subactor(arb_addr, mechanism):
"""Ensure a ``ActorNursery.start_actor()`` spawned subactor
def test_cancel_single_subactor(reg_addr, mechanism):
'''
Ensure a ``ActorNursery.start_actor()`` spawned subactor
cancels when the nursery is cancelled.
"""
'''
async def spawn_actor():
"""Spawn an actor that blocks indefinitely.
"""
'''
Spawn an actor that blocks indefinitely then cancel via
either `ActorNursery.cancel()` or an exception raise.
'''
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:
portal = await nursery.start_actor(

View File

@ -142,7 +142,7 @@ async def open_actor_local_nursery(
)
def test_actor_managed_trio_nursery_task_error_cancels_aio(
asyncio_mode: bool,
arb_addr
reg_addr: tuple,
):
'''
Verify that a ``trio`` nursery created managed in a child actor

View File

@ -83,7 +83,7 @@ has_nested_actors = pytest.mark.has_nested_actors
def spawn(
start_method,
testdir,
arb_addr,
reg_addr,
) -> 'pexpect.spawn':
if start_method != 'trio':

View File

@ -14,19 +14,19 @@ import trio
@tractor_test
async def test_reg_then_unreg(arb_addr):
async def test_reg_then_unreg(reg_addr):
actor = tractor.current_actor()
assert actor.is_arbiter
assert len(actor._registry) == 1 # only self is registered
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as n:
portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid
async with tractor.get_arbiter(*arb_addr) as aportal:
async with tractor.get_arbiter(*reg_addr) as aportal:
# this local actor should be the arbiter
assert actor is aportal.actor
@ -52,15 +52,27 @@ async def hi():
return the_line.format(tractor.current_actor().name)
async def say_hello(other_actor):
async def say_hello(
other_actor: str,
reg_addr: tuple[str, int],
):
await trio.sleep(1) # wait for other actor to spawn
async with tractor.find_actor(other_actor) as portal:
async with tractor.find_actor(
other_actor,
registry_addrs=[reg_addr],
) as portal:
assert portal is not None
return await portal.run(__name__, 'hi')
async def say_hello_use_wait(other_actor):
async with tractor.wait_for_actor(other_actor) as portal:
async def say_hello_use_wait(
other_actor: str,
reg_addr: tuple[str, int],
):
async with tractor.wait_for_actor(
other_actor,
registry_addr=reg_addr,
) as portal:
assert portal is not None
result = await portal.run(__name__, 'hi')
return result
@ -68,21 +80,29 @@ async def say_hello_use_wait(other_actor):
@tractor_test
@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait])
async def test_trynamic_trio(func, start_method, arb_addr):
"""Main tractor entry point, the "master" process (for now
acts as the "director").
"""
async def test_trynamic_trio(
func,
start_method,
reg_addr,
):
'''
Root actor acting as the "director" and running one-shot-task-actors
for the directed subs.
'''
async with tractor.open_nursery() as n:
print("Alright... Action!")
donny = await n.run_in_actor(
func,
other_actor='gretchen',
reg_addr=reg_addr,
name='donny',
)
gretchen = await n.run_in_actor(
func,
other_actor='donny',
reg_addr=reg_addr,
name='gretchen',
)
print(await gretchen.result())
@ -130,7 +150,7 @@ async def unpack_reg(actor_or_portal):
async def spawn_and_check_registry(
arb_addr: tuple,
reg_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
with_streaming: bool = False,
@ -138,9 +158,9 @@ async def spawn_and_check_registry(
) -> None:
async with tractor.open_root_actor(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
):
async with tractor.get_arbiter(*arb_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
# runtime needs to be up to call this
actor = tractor.current_actor()
@ -212,17 +232,19 @@ async def spawn_and_check_registry(
def test_subactors_unregister_on_cancel(
start_method,
use_signal,
arb_addr,
reg_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
'''
Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
spawn_and_check_registry,
arb_addr,
reg_addr,
use_signal,
remote_arbiter=False,
with_streaming=with_streaming,
@ -236,7 +258,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
daemon,
start_method,
use_signal,
arb_addr,
reg_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
@ -247,7 +269,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
trio.run(
partial(
spawn_and_check_registry,
arb_addr,
reg_addr,
use_signal,
remote_arbiter=True,
with_streaming=with_streaming,
@ -261,7 +283,7 @@ async def streamer(agen):
async def close_chans_before_nursery(
arb_addr: tuple,
reg_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
) -> None:
@ -274,9 +296,9 @@ async def close_chans_before_nursery(
entries_at_end = 1
async with tractor.open_root_actor(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
):
async with tractor.get_arbiter(*arb_addr) as aportal:
async with tractor.get_arbiter(*reg_addr) as aportal:
try:
get_reg = partial(unpack_reg, aportal)
@ -328,7 +350,7 @@ async def close_chans_before_nursery(
def test_close_channel_explicit(
start_method,
use_signal,
arb_addr,
reg_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
@ -338,7 +360,7 @@ def test_close_channel_explicit(
trio.run(
partial(
close_chans_before_nursery,
arb_addr,
reg_addr,
use_signal,
remote_arbiter=False,
),
@ -350,7 +372,7 @@ def test_close_channel_explicit_remote_arbiter(
daemon,
start_method,
use_signal,
arb_addr,
reg_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
@ -360,7 +382,7 @@ def test_close_channel_explicit_remote_arbiter(
trio.run(
partial(
close_chans_before_nursery,
arb_addr,
reg_addr,
use_signal,
remote_arbiter=True,
),

View File

@ -47,7 +47,7 @@ async def trio_cancels_single_aio_task():
await tractor.to_asyncio.run_task(sleep_forever)
def test_trio_cancels_aio_on_actor_side(arb_addr):
def test_trio_cancels_aio_on_actor_side(reg_addr):
'''
Spawn an infected actor that is cancelled by the ``trio`` side
task using std cancel scope apis.
@ -55,7 +55,7 @@ def test_trio_cancels_aio_on_actor_side(arb_addr):
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr
registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
trio_cancels_single_aio_task,
@ -94,7 +94,7 @@ async def asyncio_actor(
raise
def test_aio_simple_error(arb_addr):
def test_aio_simple_error(reg_addr):
'''
Verify a simple remote asyncio error propagates back through trio
to the parent actor.
@ -103,7 +103,7 @@ def test_aio_simple_error(arb_addr):
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr
registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
asyncio_actor,
@ -131,7 +131,7 @@ def test_aio_simple_error(arb_addr):
assert err.type == AssertionError
def test_tractor_cancels_aio(arb_addr):
def test_tractor_cancels_aio(reg_addr):
'''
Verify we can cancel a spawned asyncio task gracefully.
@ -150,7 +150,7 @@ def test_tractor_cancels_aio(arb_addr):
trio.run(main)
def test_trio_cancels_aio(arb_addr):
def test_trio_cancels_aio(reg_addr):
'''
Much like the above test with ``tractor.Portal.cancel_actor()``
except we just use a standard ``trio`` cancellation api.
@ -206,7 +206,7 @@ async def trio_ctx(
ids='parent_actor_cancels_child={}'.format
)
def test_context_spawns_aio_task_that_errors(
arb_addr,
reg_addr,
parent_cancels: bool,
):
'''
@ -288,7 +288,7 @@ async def aio_cancel():
await sleep_forever()
def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr):
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
async def main():
async with tractor.open_nursery() as n:
@ -436,7 +436,7 @@ async def stream_from_aio(
'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format
)
def test_basic_interloop_channel_stream(arb_addr, fan_out):
def test_basic_interloop_channel_stream(reg_addr, fan_out):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -450,7 +450,7 @@ def test_basic_interloop_channel_stream(arb_addr, fan_out):
# TODO: parametrize the above test and avoid the duplication here?
def test_trio_error_cancels_intertask_chan(arb_addr):
def test_trio_error_cancels_intertask_chan(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -469,7 +469,7 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
assert exc.type == Exception
def test_trio_closes_early_and_channel_exits(arb_addr):
def test_trio_closes_early_and_channel_exits(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -484,7 +484,7 @@ def test_trio_closes_early_and_channel_exits(arb_addr):
trio.run(main)
def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -561,7 +561,7 @@ async def trio_to_aio_echo_server(
ids='raise_error={}'.format,
)
def test_echoserver_detailed_mechanics(
arb_addr,
reg_addr,
raise_error_mid_stream,
):

View File

@ -939,6 +939,7 @@ async def tell_little_bro(
def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool,
raise_client_error: str,
reg_addr: tuple[str, int],
):
# NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx
@ -956,6 +957,7 @@ def test_peer_spawns_and_cancels_service_subactor(
async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
debug_mode=debug_mode,
registry_addrs=[reg_addr],
) as an:
server: Portal = await an.start_actor(
(server_name := 'spawn_server'),

View File

@ -58,7 +58,7 @@ async def context_stream(
async def stream_from_single_subactor(
arb_addr,
reg_addr,
start_method,
stream_func,
):
@ -67,7 +67,7 @@ async def stream_from_single_subactor(
# only one per host address, spawns an actor if None
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
start_method=start_method,
) as nursery:
@ -118,13 +118,13 @@ async def stream_from_single_subactor(
@pytest.mark.parametrize(
'stream_func', [async_gen_stream, context_stream]
)
def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
def test_stream_from_single_subactor(reg_addr, start_method, stream_func):
"""Verify streaming from a spawned async generator.
"""
trio.run(
partial(
stream_from_single_subactor,
arb_addr,
reg_addr,
start_method,
stream_func=stream_func,
),
@ -228,14 +228,14 @@ async def a_quadruple_example():
return result_stream
async def cancel_after(wait, arb_addr):
async with tractor.open_root_actor(arbiter_addr=arb_addr):
async def cancel_after(wait, reg_addr):
async with tractor.open_root_actor(registry_addrs=[reg_addr]):
with trio.move_on_after(wait):
return await a_quadruple_example()
@pytest.fixture(scope='module')
def time_quad_ex(arb_addr, ci_env, spawn_backend):
def time_quad_ex(reg_addr, ci_env, spawn_backend):
if spawn_backend == 'mp':
"""no idea but the mp *nix runs are flaking out here often...
"""
@ -243,7 +243,7 @@ def time_quad_ex(arb_addr, ci_env, spawn_backend):
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
start = time.time()
results = trio.run(cancel_after, timeout, arb_addr)
results = trio.run(cancel_after, timeout, reg_addr)
diff = time.time() - start
assert results
return results, diff
@ -263,14 +263,14 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
list(map(lambda i: i/10, range(3, 9)))
)
def test_not_fast_enough_quad(
arb_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
):
"""Verify we can cancel midway through the quad example and all actors
cancel gracefully.
"""
results, diff = time_quad_ex
delay = max(diff - cancel_delay, 0)
results = trio.run(cancel_after, delay, arb_addr)
results = trio.run(cancel_after, delay, reg_addr)
system = platform.system()
if system in ('Windows', 'Darwin') and results is not None:
# In CI envoirments it seems later runs are quicker then the first
@ -283,7 +283,7 @@ def test_not_fast_enough_quad(
@tractor_test
async def test_respawn_consumer_task(
arb_addr,
reg_addr,
spawn_backend,
loglevel,
):

View File

@ -24,7 +24,7 @@ async def test_no_runtime():
@tractor_test
async def test_self_is_registered(arb_addr):
async def test_self_is_registered(reg_addr):
"Verify waiting on the arbiter to register itself using the standard api."
actor = tractor.current_actor()
assert actor.is_arbiter
@ -34,20 +34,20 @@ async def test_self_is_registered(arb_addr):
@tractor_test
async def test_self_is_registered_localportal(arb_addr):
async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal."
actor = tractor.current_actor()
assert actor.is_arbiter
async with tractor.get_arbiter(*arb_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal)
with trio.fail_after(0.2):
sockaddr = await portal.run_from_ns(
'self', 'wait_for_actor', name='root')
assert sockaddr[0] == arb_addr
assert sockaddr[0] == reg_addr
def test_local_actor_async_func(arb_addr):
def test_local_actor_async_func(reg_addr):
"""Verify a simple async function in-process.
"""
nums = []
@ -55,7 +55,7 @@ def test_local_actor_async_func(arb_addr):
async def print_loop():
async with tractor.open_root_actor(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
):
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter

View File

@ -30,9 +30,9 @@ def test_abort_on_sigint(daemon):
@tractor_test
async def test_cancel_remote_arbiter(daemon, arb_addr):
async def test_cancel_remote_arbiter(daemon, reg_addr):
assert not tractor.current_actor().is_arbiter
async with tractor.get_arbiter(*arb_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
await portal.cancel_actor()
time.sleep(0.1)
@ -41,16 +41,16 @@ async def test_cancel_remote_arbiter(daemon, arb_addr):
# no arbiter socket should exist
with pytest.raises(OSError):
async with tractor.get_arbiter(*arb_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
pass
def test_register_duplicate_name(daemon, arb_addr):
def test_register_duplicate_name(daemon, reg_addr):
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as n:
assert not tractor.current_actor().is_arbiter

View File

@ -159,7 +159,7 @@ async def test_required_args(callwith_expecterror):
)
def test_multi_actor_subs_arbiter_pub(
loglevel,
arb_addr,
reg_addr,
pub_actor,
):
"""Try out the neato @pub decorator system.
@ -169,7 +169,7 @@ def test_multi_actor_subs_arbiter_pub(
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
enable_modules=[__name__],
) as n:
@ -254,12 +254,12 @@ def test_multi_actor_subs_arbiter_pub(
def test_single_subactor_pub_multitask_subs(
loglevel,
arb_addr,
reg_addr,
):
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
enable_modules=[__name__],
) as n:

View File

@ -52,7 +52,7 @@ async def short_sleep():
'fail_on_syntax',
],
)
def test_rpc_errors(arb_addr, to_call, testdir):
def test_rpc_errors(reg_addr, to_call, testdir):
"""Test errors when making various RPC requests to an actor
that either doesn't have the requested module exposed or doesn't define
the named function.
@ -84,7 +84,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
# spawn a subactor which calls us back
async with tractor.open_nursery(
arbiter_addr=arb_addr,
arbiter_addr=reg_addr,
enable_modules=exposed_mods.copy(),
) as n:

View File

@ -16,14 +16,14 @@ data_to_pass_down = {'doggy': 10, 'kitty': 4}
async def spawn(
is_arbiter: bool,
data: dict,
arb_addr: tuple[str, int],
reg_addr: tuple[str, int],
):
namespaces = [__name__]
await trio.sleep(0.1)
async with tractor.open_root_actor(
arbiter_addr=arb_addr,
arbiter_addr=reg_addr,
):
actor = tractor.current_actor()
@ -41,7 +41,7 @@ async def spawn(
is_arbiter=False,
name='sub-actor',
data=data,
arb_addr=arb_addr,
reg_addr=reg_addr,
enable_modules=namespaces,
)
@ -55,12 +55,12 @@ async def spawn(
return 10
def test_local_arbiter_subactor_global_state(arb_addr):
def test_local_arbiter_subactor_global_state(reg_addr):
result = trio.run(
spawn,
True,
data_to_pass_down,
arb_addr,
reg_addr,
)
assert result == 10
@ -140,7 +140,7 @@ async def check_loglevel(level):
def test_loglevel_propagated_to_subactor(
start_method,
capfd,
arb_addr,
reg_addr,
):
if start_method == 'mp_forkserver':
pytest.skip(
@ -152,7 +152,7 @@ def test_loglevel_propagated_to_subactor(
async with tractor.open_nursery(
name='arbiter',
start_method=start_method,
arbiter_addr=arb_addr,
arbiter_addr=reg_addr,
) as tn:
await tn.run_in_actor(

View File

@ -66,13 +66,13 @@ async def ensure_sequence(
async def open_sequence_streamer(
sequence: list[int],
arb_addr: tuple[str, int],
reg_addr: tuple[str, int],
start_method: str,
) -> tractor.MsgStream:
async with tractor.open_nursery(
arbiter_addr=arb_addr,
arbiter_addr=reg_addr,
start_method=start_method,
) as tn:
@ -93,7 +93,7 @@ async def open_sequence_streamer(
def test_stream_fan_out_to_local_subscriptions(
arb_addr,
reg_addr,
start_method,
):
@ -103,7 +103,7 @@ def test_stream_fan_out_to_local_subscriptions(
async with open_sequence_streamer(
sequence,
arb_addr,
reg_addr,
start_method,
) as stream:
@ -138,7 +138,7 @@ def test_stream_fan_out_to_local_subscriptions(
]
)
def test_consumer_and_parent_maybe_lag(
arb_addr,
reg_addr,
start_method,
task_delays,
):
@ -150,7 +150,7 @@ def test_consumer_and_parent_maybe_lag(
async with open_sequence_streamer(
sequence,
arb_addr,
reg_addr,
start_method,
) as stream:
@ -211,7 +211,7 @@ def test_consumer_and_parent_maybe_lag(
def test_faster_task_to_recv_is_cancelled_by_slower(
arb_addr,
reg_addr,
start_method,
):
'''
@ -225,7 +225,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
async with open_sequence_streamer(
sequence,
arb_addr,
reg_addr,
start_method,
) as stream:
@ -302,7 +302,7 @@ def test_subscribe_errors_after_close():
def test_ensure_slow_consumers_lag_out(
arb_addr,
reg_addr,
start_method,
):
'''This is a pure local task test; no tractor

View File

@ -18,74 +18,48 @@
tractor: structured concurrent ``trio``-"actors".
"""
from ._clustering import open_actor_cluster
from ._clustering import (
open_actor_cluster as open_actor_cluster,
)
from ._context import (
Context, # the type
context, # a func-decorator
Context as Context, # the type
context as context, # a func-decorator
)
from ._streaming import (
MsgStream,
stream,
MsgStream as MsgStream,
stream as stream,
)
from ._discovery import (
get_arbiter,
find_actor,
wait_for_actor,
query_actor,
get_arbiter as get_arbiter,
find_actor as find_actor,
wait_for_actor as wait_for_actor,
query_actor as query_actor,
)
from ._supervise import (
open_nursery as open_nursery,
ActorNursery as ActorNursery,
)
from ._supervise import open_nursery
from ._state import (
current_actor,
is_root_process,
current_actor as current_actor,
is_root_process as is_root_process,
)
from ._exceptions import (
RemoteActorError,
ModuleNotExposed,
ContextCancelled,
RemoteActorError as RemoteActorError,
ModuleNotExposed as ModuleNotExposed,
ContextCancelled as ContextCancelled,
)
from .devx import (
breakpoint,
pause,
pause_from_sync,
post_mortem,
breakpoint as breakpoint,
pause as pause,
pause_from_sync as pause_from_sync,
post_mortem as post_mortem,
)
from . import msg
from . import msg as msg
from ._root import (
run_daemon,
open_root_actor,
run_daemon as run_daemon,
open_root_actor as open_root_actor,
)
from ._ipc import Channel
from ._portal import Portal
from ._runtime import Actor
__all__ = [
'Actor',
'BaseExceptionGroup',
'Channel',
'Context',
'ContextCancelled',
'ModuleNotExposed',
'MsgStream',
'Portal',
'RemoteActorError',
'breakpoint',
'context',
'current_actor',
'find_actor',
'query_actor',
'get_arbiter',
'is_root_process',
'msg',
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'pause',
'post_mortem',
'pause_from_sync',
'query_actor',
'run_daemon',
'stream',
'to_asyncio',
'wait_for_actor',
]
from ._ipc import Channel as Channel
from ._portal import Portal as Portal
from ._runtime import Actor as Actor

View File

@ -15,32 +15,45 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Actor discovery API.
Discovery (protocols) API for automatic addressing and location
management of (service) actors.
"""
from __future__ import annotations
from typing import (
Optional,
Union,
AsyncGenerator,
AsyncContextManager,
TYPE_CHECKING,
)
from contextlib import asynccontextmanager as acm
import warnings
from .trionics import gather_contexts
from ._ipc import _connect_chan, Channel
from ._portal import (
Portal,
open_portal,
LocalPortal,
)
from ._state import current_actor, _runtime_vars
from ._state import (
current_actor,
_runtime_vars,
)
if TYPE_CHECKING:
from ._runtime import Actor
@acm
async def get_arbiter(
async def get_registry(
host: str,
port: int,
) -> AsyncGenerator[Union[Portal, LocalPortal], None]:
) -> AsyncGenerator[
Portal | LocalPortal | None,
None,
]:
'''
Return a portal instance connected to a local or remote
arbiter.
@ -51,16 +64,33 @@ async def get_arbiter(
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_arbiter:
if actor.is_registrar:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor, Channel((host, port)))
yield LocalPortal(
actor,
Channel((host, port))
)
else:
async with _connect_chan(host, port) as chan:
async with (
_connect_chan(host, port) as chan,
open_portal(chan) as regstr_ptl,
):
yield regstr_ptl
async with open_portal(chan) as arb_portal:
yield arb_portal
# TODO: deprecate and this remove _arbiter form!
@acm
async def get_arbiter(*args, **kwargs):
warnings.warn(
'`tractor.get_arbiter()` is now deprecated!\n'
'Use `.get_registry()` instead!',
DeprecationWarning,
stacklevel=2,
)
async with get_registry(*args, **kwargs) as to_yield:
yield to_yield
@acm
@ -68,51 +98,80 @@ async def get_root(
**kwargs,
) -> AsyncGenerator[Portal, None]:
# TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs?
host, port = _runtime_vars['_root_mailbox']
assert host is not None
async with _connect_chan(host, port) as chan:
async with open_portal(chan, **kwargs) as portal:
yield portal
async with (
_connect_chan(host, port) as chan,
open_portal(chan, **kwargs) as portal,
):
yield portal
@acm
async def query_actor(
name: str,
arbiter_sockaddr: Optional[tuple[str, int]] = None,
arbiter_sockaddr: tuple[str, int] | None = None,
regaddr: tuple[str, int] | None = None,
) -> AsyncGenerator[tuple[str, int], None]:
) -> AsyncGenerator[
tuple[str, int] | None,
None,
]:
'''
Simple address lookup for a given actor name.
Make a transport address lookup for an actor name to a specific
registrar.
Returns the (socket) address or ``None``.
Returns the (socket) address or ``None`` if no entry under that
name exists for the given registrar listening @ `regaddr`.
'''
actor = current_actor()
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr
) as arb_portal:
actor: Actor = current_actor()
if (
name == 'registrar'
and actor.is_registrar
):
raise RuntimeError(
'The current actor IS the registry!?'
)
sockaddr = await arb_portal.run_from_ns(
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.query_actor(regaddr=<blah>)` is deprecated.\n'
'Use `registry_addrs: list[tuple]` instead!',
DeprecationWarning,
stacklevel=2,
)
regaddr: list[tuple[str, int]] = arbiter_sockaddr
reg_portal: Portal
regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
async with get_registry(*regaddr) as reg_portal:
# TODO: return portals to all available actors - for now
# just the last one that registered
sockaddr: tuple[str, int] = await reg_portal.run_from_ns(
'self',
'find_actor',
name=name,
)
# TODO: return portals to all available actors - for now just
# the last one that registered
if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter")
yield sockaddr if sockaddr else None
yield sockaddr
@acm
async def find_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None
arbiter_sockaddr: tuple[str, int]|None = None,
registry_addrs: list[tuple[str, int]]|None = None,
) -> AsyncGenerator[Optional[Portal], None]:
only_first: bool = True,
raise_on_none: bool = False,
) -> AsyncGenerator[
Portal | list[Portal] | None,
None,
]:
'''
Ask the arbiter to find actor(s) by name.
@ -120,24 +179,83 @@ async def find_actor(
known to the arbiter.
'''
async with query_actor(
name=name,
arbiter_sockaddr=arbiter_sockaddr,
) as sockaddr:
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.find_actor(arbiter_sockaddr=<blah>)` is deprecated.\n'
'Use `registry_addrs: list[tuple]` instead!',
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr]
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
@acm
async def maybe_open_portal_from_reg_addr(
addr: tuple[str, int],
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
if not registry_addrs:
# XXX NOTE: make sure to dynamically read the value on
# every call since something may change it globally (eg.
# like in our discovery test suite)!
from . import _root
registry_addrs = (
_runtime_vars['_registry_addrs']
or
_root._default_lo_addrs
)
maybe_portals: list[
AsyncContextManager[tuple[str, int]]
] = list(
maybe_open_portal_from_reg_addr(addr)
for addr in registry_addrs
)
async with gather_contexts(
mngrs=maybe_portals,
) as portals:
# log.runtime(
# 'Gathered portals:\n'
# f'{portals}'
# )
# NOTE: `gather_contexts()` will return a
# `tuple[None, None, ..., None]` if no contact
# can be made with any regstrar at any of the
# N provided addrs!
if not any(portals):
if raise_on_none:
raise RuntimeError(
f'No actor "{name}" found registered @ {registry_addrs}'
)
yield None
return
portals: list[Portal] = list(portals)
if only_first:
yield portals[0]
else:
# TODO: currently this may return multiple portals
# given there are multi-homed or multiple registrars..
# SO, we probably need de-duplication logic?
yield portals
@acm
async def wait_for_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None,
# registry_addr: tuple[str, int] | None = None,
registry_addr: tuple[str, int] | None = None,
) -> AsyncGenerator[Portal, None]:
'''
@ -146,17 +264,31 @@ async def wait_for_actor(
A portal to the first registered actor is returned.
'''
actor = current_actor()
actor: Actor = current_actor()
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr,
) as arb_portal:
sockaddrs = await arb_portal.run_from_ns(
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.wait_for_actor(arbiter_sockaddr=<foo>)` is deprecated.\n'
'Use `registry_addr: tuple` instead!',
DeprecationWarning,
stacklevel=2,
)
registry_addr: tuple[str, int] = arbiter_sockaddr
# TODO: use `.trionics.gather_contexts()` like
# above in `find_actor()` as well?
reg_portal: Portal
regaddr: tuple[str, int] = registry_addr or actor.reg_addrs[0]
async with get_registry(*regaddr) as reg_portal:
sockaddrs = await reg_portal.run_from_ns(
'self',
'wait_for_actor',
name=name,
)
sockaddr = sockaddrs[-1]
# get latest registered addr by default?
# TODO: offer multi-portal yields in multi-homed case?
sockaddr: tuple[str, int] = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:

View File

@ -47,8 +47,8 @@ log = get_logger(__name__)
def _mp_main(
actor: Actor, # type: ignore
accept_addr: tuple[str, int],
actor: Actor,
accept_addrs: list[tuple[str, int]],
forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey,
parent_addr: tuple[str, int] | None = None,
@ -77,8 +77,8 @@ def _mp_main(
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
async_main,
actor,
accept_addr,
actor=actor,
accept_addrs=accept_addrs,
parent_addr=parent_addr
)
try:
@ -96,7 +96,7 @@ def _mp_main(
def _trio_main(
actor: Actor, # type: ignore
actor: Actor,
*,
parent_addr: tuple[str, int] | None = None,
infect_asyncio: bool = False,

View File

@ -517,7 +517,9 @@ class Channel:
@acm
async def _connect_chan(
host: str, port: int
host: str,
port: int
) -> typing.AsyncGenerator[Channel, None]:
'''
Create and connect a channel with disconnect on context manager

View File

@ -0,0 +1,151 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Multiaddress parser and utils according the spec(s) defined by
`libp2p` and used in dependent project such as `ipfs`:
- https://docs.libp2p.io/concepts/fundamentals/addressing/
- https://github.com/libp2p/specs/blob/master/addressing/README.md
'''
from typing import Iterator
from bidict import bidict
# TODO: see if we can leverage libp2p ecosys projects instead of
# rolling our own (parser) impls of the above addressing specs:
# - https://github.com/libp2p/py-libp2p
# - https://docs.libp2p.io/concepts/nat/circuit-relay/#relay-addresses
# prots: bidict[int, str] = bidict({
prots: bidict[int, str] = {
'ipv4': 3,
'ipv6': 3,
'wg': 3,
'tcp': 4,
'udp': 4,
# TODO: support the next-gen shite Bo
# 'quic': 4,
# 'ssh': 7, # via rsyscall bootstrapping
}
prot_params: dict[str, tuple[str]] = {
'ipv4': ('addr',),
'ipv6': ('addr',),
'wg': ('addr', 'port', 'pubkey'),
'tcp': ('port',),
'udp': ('port',),
# 'quic': ('port',),
# 'ssh': ('port',),
}
def iter_prot_layers(
multiaddr: str,
) -> Iterator[
tuple[
int,
list[str]
]
]:
'''
Unpack a libp2p style "multiaddress" into multiple "segments"
for each "layer" of the protocoll stack (in OSI terms).
'''
tokens: list[str] = multiaddr.split('/')
root, tokens = tokens[0], tokens[1:]
assert not root # there is a root '/' on LHS
itokens = iter(tokens)
prot: str | None = None
params: list[str] = []
for token in itokens:
# every prot path should start with a known
# key-str.
if token in prots:
if prot is None:
prot: str = token
else:
yield prot, params
prot = token
params = []
elif token not in prots:
params.append(token)
else:
yield prot, params
def parse_maddr(
multiaddr: str,
) -> dict[str, str | int | dict]:
'''
Parse a libp2p style "multiaddress" into its distinct protocol
segments where each segment is of the form:
`../<protocol>/<param0>/<param1>/../<paramN>`
and is loaded into a (order preserving) `layers: dict[str,
dict[str, Any]` which holds each protocol-layer-segment of the
original `str` path as a separate entry according to its approx
OSI "layer number".
Any `paramN` in the path must be distinctly defined by a str-token in the
(module global) `prot_params` table.
For eg. for wireguard which requires an address, port number and publickey
the protocol params are specified as the entry:
'wg': ('addr', 'port', 'pubkey'),
and are thus parsed from a maddr in that order:
`'/wg/1.1.1.1/51820/<pubkey>'`
'''
layers: dict[str, str | int | dict] = {}
for (
prot_key,
params,
) in iter_prot_layers(multiaddr):
layer: int = prots[prot_key] # OSI layer used for sorting
ep: dict[str, int | str] = {'layer': layer}
layers[prot_key] = ep
# TODO; validation and resolving of names:
# - each param via a validator provided as part of the
# prot_params def? (also see `"port"` case below..)
# - do a resolv step that will check addrs against
# any loaded network.resolv: dict[str, str]
rparams: list = list(reversed(params))
for key in prot_params[prot_key]:
val: str | int = rparams.pop()
# TODO: UGHH, dunno what we should do for validation
# here, put it in the params spec somehow?
if key == 'port':
val = int(val)
ep[key] = val
return layers

View File

@ -461,7 +461,12 @@ class LocalPortal:
actor: 'Actor' # type: ignore # noqa
channel: Channel
async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any:
async def run_from_ns(
self,
ns: str,
func_name: str,
**kwargs,
) -> Any:
'''
Run a requested local function from a namespace path and
return it's result.

View File

@ -25,7 +25,6 @@ import logging
import signal
import sys
import os
import typing
import warnings
@ -47,8 +46,14 @@ from ._exceptions import is_multi_cancelled
# set at startup and after forks
_default_arbiter_host: str = '127.0.0.1'
_default_arbiter_port: int = 1616
_default_host: str = '127.0.0.1'
_default_port: int = 1616
# default registry always on localhost
_default_lo_addrs: list[tuple[str, int]] = [(
_default_host,
_default_port,
)]
logger = log.get_logger('tractor')
@ -59,28 +64,32 @@ async def open_root_actor(
*,
# defaults are above
arbiter_addr: tuple[str, int] | None = None,
registry_addrs: list[tuple[str, int]]|None = None,
# defaults are above
registry_addr: tuple[str, int] | None = None,
arbiter_addr: tuple[str, int]|None = None,
name: str | None = 'root',
name: str|None = 'root',
# either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default).
start_method: _spawn.SpawnMethodKey | None = None,
start_method: _spawn.SpawnMethodKey|None = None,
# enables the multi-process debugger support
debug_mode: bool = False,
# internal logging
loglevel: str | None = None,
loglevel: str|None = None,
enable_modules: list | None = None,
rpc_module_paths: list | None = None,
enable_modules: list|None = None,
rpc_module_paths: list|None = None,
) -> typing.Any:
# NOTE: allow caller to ensure that only one registry exists
# and that this call creates it.
ensure_registry: bool = False,
) -> Actor:
'''
Runtime init entry point for ``tractor``.
@ -100,7 +109,11 @@ async def open_root_actor(
_state._runtime_vars['_is_root'] = True
# caps based rpc list
enable_modules = enable_modules or []
enable_modules = (
enable_modules
or
[]
)
if rpc_module_paths:
warnings.warn(
@ -116,20 +129,19 @@ async def open_root_actor(
if arbiter_addr is not None:
warnings.warn(
'`arbiter_addr` is now deprecated and has been renamed to'
'`registry_addr`.\nUse that instead..',
'`arbiter_addr` is now deprecated\n'
'Use `registry_addrs: list[tuple]` instead..',
DeprecationWarning,
stacklevel=2,
)
registry_addrs = [arbiter_addr]
registry_addr = (host, port) = (
registry_addr
or arbiter_addr
or (
_default_arbiter_host,
_default_arbiter_port,
)
registry_addrs: list[tuple[str, int]] = (
registry_addrs
or
_default_lo_addrs
)
assert registry_addrs
loglevel = (
loglevel
@ -177,73 +189,131 @@ async def open_root_actor(
'`stackscope` not installed for use in debug mode!'
)
try:
# make a temporary connection to see if an arbiter exists,
# if one can't be made quickly we assume none exists.
arbiter_found = False
# closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = []
# TODO: this connect-and-bail forces us to have to carefully
# rewrap TCP 104-connection-reset errors as EOF so as to avoid
# propagating cancel-causing errors to the channel-msg loop
# machinery. Likely it would be better to eventually have
# a "discovery" protocol with basic handshake instead.
with trio.move_on_after(1):
async with _connect_chan(host, port):
arbiter_found = True
async def ping_tpt_socket(
addr: tuple[str, int],
timeout: float = 1,
) -> None:
'''
Attempt temporary connection to see if a registry is
listening at the requested address by a tranport layer
ping.
except OSError:
# TODO: make this a "discovery" log level?
logger.warning(f"No actor registry found @ {host}:{port}")
If a connection can't be made quickly we assume none no
server is listening at that addr.
# create a local actor and start up its main routine/task
if arbiter_found:
'''
try:
# TODO: this connect-and-bail forces us to have to
# carefully rewrap TCP 104-connection-reset errors as
# EOF so as to avoid propagating cancel-causing errors
# to the channel-msg loop machinery. Likely it would
# be better to eventually have a "discovery" protocol
# with basic handshake instead?
with trio.move_on_after(timeout):
async with _connect_chan(*addr):
ponged_addrs.append(addr)
except OSError:
# TODO: make this a "discovery" log level?
logger.warning(f'No actor registry found @ {addr}')
async with trio.open_nursery() as tn:
for addr in registry_addrs:
tn.start_soon(
ping_tpt_socket,
tuple(addr), # TODO: just drop this requirement?
)
trans_bind_addrs: list[tuple[str, int]] = []
# Create a new local root-actor instance which IS NOT THE
# REGISTRAR
if ponged_addrs:
if ensure_registry:
raise RuntimeError(
f'Failed to open `{name}`@{ponged_addrs}: '
'registry socket(s) already bound'
)
# we were able to connect to an arbiter
logger.info(f"Arbiter seems to exist @ {host}:{port}")
logger.info(
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
)
actor = Actor(
name or 'anonymous',
arbiter_addr=registry_addr,
name=name or 'anonymous',
registry_addrs=ponged_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
host, port = (host, 0)
# DO NOT use the registry_addrs as the transport server
# addrs for this new non-registar, root-actor.
for host, port in ponged_addrs:
# NOTE: zero triggers dynamic OS port allocation
trans_bind_addrs.append((host, 0))
# Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors.
else:
# start this local actor as the arbiter (aka a regular actor who
# manages the local registry of "mailboxes")
# Note that if the current actor is the arbiter it is desirable
# for it to stay up indefinitely until a re-election process has
# taken place - which is not implemented yet FYI).
# NOTE that if the current actor IS THE REGISTAR, the
# following init steps are taken:
# - the tranport layer server is bound to each (host, port)
# pair defined in provided registry_addrs, or the default.
trans_bind_addrs = registry_addrs
# - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub)
# actors are terminated (via SC supervision) or,
# a re-election process has taken place.
# NOTE: all of ^ which is not implemented yet - see:
# https://github.com/goodboy/tractor/issues/216
# https://github.com/goodboy/tractor/pull/348
# https://github.com/goodboy/tractor/issues/296
actor = Arbiter(
name or 'arbiter',
arbiter_addr=registry_addr,
name or 'registrar',
registry_addrs=registry_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
# Start up main task set via core actor-runtime nurseries.
try:
# assign process-local actor
_state._current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
logger.info(f"Starting local {actor} @ {host}:{port}")
ml_addrs_str: str = '\n'.join(
f'@{addr}' for addr in trans_bind_addrs
)
logger.info(
f'Starting local {actor.uid} on the following transport addrs:\n'
f'{ml_addrs_str}'
)
# start the actor runtime in a new task
async with trio.open_nursery() as nursery:
# ``_runtime.async_main()`` creates an internal nursery and
# thus blocks here until the entire underlying actor tree has
# terminated thereby conducting structured concurrency.
# ``_runtime.async_main()`` creates an internal nursery
# and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called
# "end-to-end" structured concurrency throughout an
# entire hierarchical python sub-process set; all
# "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as
# well.
await nursery.start(
partial(
async_main,
actor,
accept_addr=(host, port),
accept_addrs=trans_bind_addrs,
parent_addr=None
)
)
@ -255,7 +325,7 @@ async def open_root_actor(
BaseExceptionGroup,
) as err:
entered = await _debug._maybe_enter_pm(err)
entered: bool = await _debug._maybe_enter_pm(err)
if (
not entered
and
@ -263,7 +333,8 @@ async def open_root_actor(
):
logger.exception('Root actor crashed:\n')
# always re-raise
# ALWAYS re-raise any error bubbled up from the
# runtime!
raise
finally:
@ -284,7 +355,7 @@ async def open_root_actor(
_state._current_actor = None
_state._last_actor_terminated = actor
# restore breakpoint hook state
# restore built-in `breakpoint()` hook state
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
@ -300,10 +371,7 @@ def run_daemon(
# runtime kwargs
name: str | None = 'root',
registry_addr: tuple[str, int] = (
_default_arbiter_host,
_default_arbiter_port,
),
registry_addrs: list[tuple[str, int]] = _default_lo_addrs,
start_method: str | None = None,
debug_mode: bool = False,
@ -327,7 +395,7 @@ def run_daemon(
async def _main():
async with open_root_actor(
registry_addr=registry_addr,
registry_addrs=registry_addrs,
name=name,
start_method=start_method,
debug_mode=debug_mode,

View File

@ -45,6 +45,7 @@ from functools import partial
from itertools import chain
import importlib
import importlib.util
import os
from pprint import pformat
import signal
import sys
@ -55,7 +56,7 @@ from typing import (
)
import uuid
from types import ModuleType
import os
import warnings
import trio
from trio import (
@ -77,8 +78,8 @@ from ._exceptions import (
ContextCancelled,
TransportClosed,
)
from ._discovery import get_arbiter
from .devx import _debug
from ._discovery import get_registry
from ._portal import Portal
from . import _state
from . import _mp_fixup_main
@ -127,6 +128,11 @@ class Actor:
# ugh, we need to get rid of this and replace with a "registry" sys
# https://github.com/goodboy/tractor/issues/216
is_arbiter: bool = False
@property
def is_registrar(self) -> bool:
return self.is_arbiter
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork
@ -162,10 +168,14 @@ class Actor:
name: str,
*,
enable_modules: list[str] = [],
uid: str | None = None,
loglevel: str | None = None,
uid: str|None = None,
loglevel: str|None = None,
registry_addrs: list[tuple[str, int]]|None = None,
spawn_method: str|None = None,
# TODO: remove!
arbiter_addr: tuple[str, int] | None = None,
spawn_method: str | None = None
) -> None:
'''
This constructor is called in the parent actor **before** the spawning
@ -189,27 +199,30 @@ class Actor:
# always include debugging tools module
enable_modules.append('tractor.devx._debug')
mods = {}
self.enable_modules: dict[str, str] = {}
for name in enable_modules:
mod = importlib.import_module(name)
mods[name] = _get_mod_abspath(mod)
mod: ModuleType = importlib.import_module(name)
self.enable_modules[name] = _get_mod_abspath(mod)
self.enable_modules = mods
self._mods: dict[str, ModuleType] = {}
self.loglevel = loglevel
self.loglevel: str = loglevel
self._arb_addr: tuple[str, int] | None = (
str(arbiter_addr[0]),
int(arbiter_addr[1])
) if arbiter_addr else None
if arbiter_addr is not None:
warnings.warn(
'`Actor(arbiter_addr=<blah>)` is now deprecated.\n'
'Use `registry_addrs: list[tuple]` instead.',
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[tuple[str, int]] = [arbiter_addr]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
# by the user (currently called the "arbiter")
self._spawn_method = spawn_method
self._spawn_method: str = spawn_method
self._peers: defaultdict = defaultdict(list)
self._peer_connected: dict = {}
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
self._no_more_peers = trio.Event()
self._no_more_peers.set()
self._ongoing_rpc_tasks = trio.Event()
@ -239,6 +252,45 @@ class Actor:
ActorNursery | None,
] = {} # type: ignore # noqa
# when provided, init the registry addresses property from
# input via the validator.
self._reg_addrs: list[tuple[str, int]] = []
if registry_addrs:
self.reg_addrs: list[tuple[str, int]] = registry_addrs
_state._runtime_vars['_registry_addrs'] = registry_addrs
@property
def reg_addrs(self) -> list[tuple[str, int]]:
'''
List of (socket) addresses for all known (and contactable)
registry actors.
'''
return self._reg_addrs
@reg_addrs.setter
def reg_addrs(
self,
addrs: list[tuple[str, int]],
) -> None:
if not addrs:
log.warning(
'Empty registry address list is invalid:\n'
f'{addrs}'
)
return
# always sanity check the input list since it's critical
# that addrs are correct for discovery sys operation.
for addr in addrs:
if not isinstance(addr, tuple):
raise ValueError(
'Expected `Actor.reg_addrs: list[tuple[str, int]]`\n'
f'Got {addrs}'
)
self._reg_addrs = addrs
async def wait_for_peer(
self, uid: tuple[str, str]
) -> tuple[trio.Event, Channel]:
@ -336,6 +388,12 @@ class Actor:
self._no_more_peers = trio.Event() # unset by making new
chan = Channel.from_stream(stream)
their_uid: tuple[str, str]|None = chan.uid
if their_uid:
log.warning(
f'Re-connection from already known {their_uid}'
)
else:
log.runtime(f'New connection to us @{chan.raddr}')
con_msg: str = ''
if their_uid:
@ -517,16 +575,19 @@ class Actor:
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report that the
# IPC layer may have failed unexpectedly since it may be
# the cause of other downstream errors.
# registered within a local nursery, we report
# that the IPC layer may have failed
# unexpectedly since it may be the cause of
# other downstream errors.
entry = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
poll = getattr(proc, 'poll', None)
if poll and poll() is None:
if (
(poll := getattr(proc, 'poll', None))
and poll() is None
):
log.cancel(
f'Peer IPC broke but subproc is alive?\n\n'
@ -880,11 +941,11 @@ class Actor:
)
await chan.connect()
# TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names.
await self._do_handshake(chan)
accept_addr: tuple[str, int] | None = None
accept_addrs: list[tuple[str, int]] | None = None
if self._spawn_method == "trio":
# Receive runtime state from our parent
parent_data: dict[str, Any]
@ -897,10 +958,7 @@ class Actor:
# if "trace"/"util" mode is enabled?
f'{pformat(parent_data)}\n'
)
accept_addr = (
parent_data.pop('bind_host'),
parent_data.pop('bind_port'),
)
accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
rvs = parent_data.pop('_runtime_vars')
if rvs['_debug_mode']:
@ -918,18 +976,23 @@ class Actor:
_state._runtime_vars.update(rvs)
for attr, value in parent_data.items():
if attr == '_arb_addr':
if (
attr == 'reg_addrs'
and value
):
# XXX: ``msgspec`` doesn't support serializing tuples
# so just cash manually here since it's what our
# internals expect.
value = tuple(value) if value else None
self._arb_addr = value
# TODO: we don't really NEED these as
# tuples so we can probably drop this
# casting since apparently in python lists
# are "more efficient"?
self.reg_addrs = [tuple(val) for val in value]
else:
setattr(self, attr, value)
return chan, accept_addr
return chan, accept_addrs
except OSError: # failed to connect
log.warning(
@ -946,9 +1009,9 @@ class Actor:
handler_nursery: Nursery,
*,
# (host, port) to bind for channel server
accept_host: tuple[str, int] | None = None,
accept_port: int = 0,
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
listen_sockaddrs: list[tuple[str, int]] | None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Start the IPC transport server, begin listening for new connections.
@ -958,30 +1021,40 @@ class Actor:
`.cancel_server()` is called.
'''
if listen_sockaddrs is None:
listen_sockaddrs = [(None, 0)]
self._server_down = trio.Event()
try:
async with trio.open_nursery() as server_n:
listeners: list[trio.abc.Listener] = await server_n.start(
partial(
trio.serve_tcp,
self._stream_handler,
# new connections will stay alive even if this server
# is cancelled
handler_nursery=handler_nursery,
port=accept_port,
host=accept_host,
for host, port in listen_sockaddrs:
listeners: list[trio.abc.Listener] = await server_n.start(
partial(
trio.serve_tcp,
handler=self._stream_handler,
port=port,
host=host,
# NOTE: configured such that new
# connections will stay alive even if
# this server is cancelled!
handler_nursery=handler_nursery,
)
)
)
sockets: list[trio.socket] = [
getattr(listener, 'socket', 'unknown socket')
for listener in listeners
]
log.runtime(
'Started TCP server(s)\n'
f'|_{sockets}\n'
)
self._listeners.extend(listeners)
sockets: list[trio.socket] = [
getattr(listener, 'socket', 'unknown socket')
for listener in listeners
]
log.runtime(
'Started TCP server(s)\n'
f'|_{sockets}\n'
)
self._listeners.extend(listeners)
task_status.started(server_n)
finally:
# signal the server is down since nursery above terminated
self._server_down.set()
@ -1318,6 +1391,19 @@ class Actor:
log.runtime("Shutting down channel server")
self._server_n.cancel_scope.cancel()
@property
def accept_addrs(self) -> list[tuple[str, int]]:
'''
All addresses to which the transport-channel server binds
and listens for new connections.
'''
# throws OSError on failure
return [
listener.socket.getsockname()
for listener in self._listeners
] # type: ignore
@property
def accept_addr(self) -> tuple[str, int]:
'''
@ -1326,7 +1412,7 @@ class Actor:
'''
# throws OSError on failure
return self._listeners[0].socket.getsockname() # type: ignore
return self.accept_addrs[0]
def get_parent(self) -> Portal:
'''
@ -1343,6 +1429,7 @@ class Actor:
'''
return self._peers[uid]
# TODO: move to `Channel.handshake(uid)`
async def _do_handshake(
self,
chan: Channel
@ -1379,7 +1466,7 @@ class Actor:
async def async_main(
actor: Actor,
accept_addr: tuple[str, int] | None = None,
accept_addrs: tuple[str, int] | None = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
@ -1407,20 +1494,25 @@ async def async_main(
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
registered_with_arbiter = False
is_registered: bool = False
try:
# establish primary connection with immediate parent
actor._parent_chan = None
actor._parent_chan: Channel | None = None
if parent_addr is not None:
actor._parent_chan, accept_addr_rent = await actor._from_parent(
parent_addr)
(
actor._parent_chan,
set_accept_addr_says_rent,
) = await actor._from_parent(parent_addr)
# either it's passed in because we're not a child
# or because we're running in mp mode
if accept_addr_rent is not None:
accept_addr = accept_addr_rent
# either it's passed in because we're not a child or
# because we're running in mp mode
if (
set_accept_addr_says_rent
and set_accept_addr_says_rent is not None
):
accept_addrs = set_accept_addr_says_rent
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
@ -1460,34 +1552,72 @@ async def async_main(
# - subactor: the bind address is sent by our parent
# over our established channel
# - root actor: the ``accept_addr`` passed to this method
assert accept_addr
host, port = accept_addr
assert accept_addrs
actor._server_n = await service_nursery.start(
partial(
actor._serve_forever,
service_nursery,
accept_host=host,
accept_port=port
try:
actor._server_n = await service_nursery.start(
partial(
actor._serve_forever,
service_nursery,
listen_sockaddrs=accept_addrs,
)
)
)
accept_addr = actor.accept_addr
except OSError as oserr:
# NOTE: always allow runtime hackers to debug
# tranport address bind errors - normally it's
# something silly like the wrong socket-address
# passed via a config or CLI Bo
entered_debug: bool = await _debug._maybe_enter_pm(oserr)
if not entered_debug:
log.exception('Failed to init IPC channel server !?\n')
raise
accept_addrs: list[tuple[str, int]] = actor.accept_addrs
# NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since
# all sub-actors should be able to speak to
# their root actor over that channel.
if _state._runtime_vars['_is_root']:
_state._runtime_vars['_root_mailbox'] = accept_addr
for addr in accept_addrs:
host, _ = addr
# TODO: generic 'lo' detector predicate
if '127.0.0.1' in host:
_state._runtime_vars['_root_mailbox'] = addr
# Register with the arbiter if we're told its addr
log.runtime(f"Registering {actor} for role `{actor.name}`")
assert isinstance(actor._arb_addr, tuple)
log.runtime(
f'Registering `{actor.name}` ->\n'
f'{pformat(accept_addrs)}'
)
async with get_arbiter(*actor._arb_addr) as arb_portal:
await arb_portal.run_from_ns(
'self',
'register_actor',
uid=actor.uid,
sockaddr=accept_addr,
)
# TODO: ideally we don't fan out to all registrars
# if addresses point to the same actor..
# So we need a way to detect that? maybe iterate
# only on unique actor uids?
for addr in actor.reg_addrs:
try:
assert isinstance(addr, tuple)
assert addr[1] # non-zero after bind
except AssertionError:
await _debug.pause()
registered_with_arbiter = True
async with get_registry(*addr) as reg_portal:
for accept_addr in accept_addrs:
if not accept_addr[1]:
await _debug.pause()
assert accept_addr[1]
await reg_portal.run_from_ns(
'self',
'register_actor',
uid=actor.uid,
sockaddr=accept_addr,
)
is_registered: bool = True
# init steps complete
task_status.started()
@ -1520,18 +1650,20 @@ async def async_main(
log.runtime("Closing all actor lifetime contexts")
actor.lifetime_stack.close()
if not registered_with_arbiter:
if not is_registered:
# TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger
# once we have that all working with std streams locking?
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {actor._arb_addr}?")
f"@ {actor.reg_addrs[0]}?")
log.error(
"\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n"
"\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n"
"\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n"
"\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n"
"\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n"
"\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n"
"\tIf this is a sub-actor hopefully its parent will keep running "
"correctly presuming this error was safely ignored..\n\n"
"\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT: "
"https://github.com/goodboy/tractor/issues\n"
)
if actor._parent_chan:
@ -1571,27 +1703,33 @@ async def async_main(
# Unregister actor from the registry-sys / registrar.
if (
registered_with_arbiter
and not actor.is_arbiter
is_registered
and not actor.is_registrar
):
failed = False
assert isinstance(actor._arb_addr, tuple)
with trio.move_on_after(0.5) as cs:
cs.shield = True
try:
async with get_arbiter(*actor._arb_addr) as arb_portal:
await arb_portal.run_from_ns(
'self',
'unregister_actor',
uid=actor.uid
)
except OSError:
failed: bool = False
for addr in actor.reg_addrs:
assert isinstance(addr, tuple)
with trio.move_on_after(0.5) as cs:
cs.shield = True
try:
async with get_registry(
*addr,
) as reg_portal:
await reg_portal.run_from_ns(
'self',
'unregister_actor',
uid=actor.uid
)
except OSError:
failed = True
if cs.cancelled_caught:
failed = True
if cs.cancelled_caught:
failed = True
if failed:
log.warning(
f"Failed to unregister {actor.name} from arbiter")
if failed:
log.warning(
f'Failed to unregister {actor.name} from '
f'registar @ {addr}'
)
# Ensure all peers (actors connected to us as clients) are finished
if not actor._no_more_peers.is_set():
@ -1610,18 +1748,36 @@ async def async_main(
# TODO: rename to `Registry` and move to `._discovery`!
class Arbiter(Actor):
'''
A special actor who knows all the other actors and always has
access to a top level nursery.
A special registrar actor who can contact all other actors
within its immediate process tree and possibly keeps a registry
of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor"
and thus always has access to the top-most-level actor
(process) nursery.
The arbiter is by default the first actor spawned on each host
and is responsible for keeping track of all other actors for
coordination purposes. If a new main process is launched and an
arbiter is already running that arbiter will be used.
By default, the registrar is always initialized when and if no
other registrar socket addrs have been specified to runtime
init entry-points (such as `open_root_actor()` or
`open_nursery()`). Any time a new main process is launched (and
thus thus a new root actor created) and, no existing registrar
can be contacted at the provided `registry_addr`, then a new
one is always created; however, if one can be reached it is
used.
Normally a distributed app requires at least registrar per
logical host where for that given "host space" (aka localhost
IPC domain of addresses) it is responsible for making all other
host (local address) bound actors *discoverable* to external
actor trees running on remote hosts.
'''
is_arbiter = True
def __init__(self, *args, **kwargs) -> None:
def __init__(
self,
*args,
**kwargs,
) -> None:
self._registry: dict[
tuple[str, str],
@ -1663,7 +1819,10 @@ class Arbiter(Actor):
# unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return {'.'.join(key): val for key, val in self._registry.items()}
return {
'.'.join(key): val
for key, val in self._registry.items()
}
async def wait_for_actor(
self,
@ -1706,8 +1865,15 @@ class Arbiter(Actor):
sockaddr: tuple[str, int]
) -> None:
uid = name, _ = (str(uid[0]), str(uid[1]))
self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
uid = name, hash = (str(uid[0]), str(uid[1]))
addr = (host, port) = (
str(sockaddr[0]),
int(sockaddr[1]),
)
if port == 0:
await _debug.pause()
assert port # should never be 0-dynamic-os-alloc
self._registry[uid] = addr
# pop and signal all waiter events
events = self._waiters.pop(name, [])

View File

@ -220,6 +220,10 @@ async def hard_kill(
# whilst also hacking on it XD
# terminate_after: int = 99999,
# NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD
# terminate_after: int = 99999,
) -> None:
'''
Un-gracefully terminate an OS level `trio.Process` after timeout.
@ -365,7 +369,7 @@ async def new_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
@ -387,7 +391,7 @@ async def new_proc(
actor_nursery,
subactor,
errors,
bind_addr,
bind_addrs,
parent_addr,
_runtime_vars, # run time vars
infect_asyncio=infect_asyncio,
@ -402,7 +406,7 @@ async def trio_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@ -491,12 +495,11 @@ async def trio_proc(
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"enable_modules": subactor.enable_modules,
"_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1],
"_runtime_vars": _runtime_vars,
'_parent_main_data': subactor._parent_main_data,
'enable_modules': subactor.enable_modules,
'reg_addrs': subactor.reg_addrs,
'bind_addrs': bind_addrs,
'_runtime_vars': _runtime_vars,
})
# track subactor in current nursery
@ -602,7 +605,7 @@ async def mp_proc(
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@ -660,7 +663,7 @@ async def mp_proc(
target=_mp_main,
args=(
subactor,
bind_addr,
bind_addrs,
fs_info,
_spawn_method,
parent_addr,

View File

@ -33,7 +33,8 @@ _last_actor_terminated: Actor|None = None
_runtime_vars: dict[str, Any] = {
'_debug_mode': False,
'_is_root': False,
'_root_mailbox': (None, None)
'_root_mailbox': (None, None),
'_registry_addrs': [],
}

View File

@ -22,10 +22,7 @@ from contextlib import asynccontextmanager as acm
from functools import partial
import inspect
from pprint import pformat
from typing import (
Optional,
TYPE_CHECKING,
)
from typing import TYPE_CHECKING
import typing
import warnings
@ -97,7 +94,7 @@ class ActorNursery:
tuple[
Actor,
trio.Process | mp.Process,
Optional[Portal],
Portal | None,
]
] = {}
# portals spawned with ``run_in_actor()`` are
@ -121,12 +118,12 @@ class ActorNursery:
self,
name: str,
*,
bind_addr: tuple[str, int] = _default_bind_addr,
bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
nursery: trio.Nursery | None = None,
debug_mode: Optional[bool] | None = None,
debug_mode: bool | None = None,
infect_asyncio: bool = False,
) -> Portal:
'''
@ -161,7 +158,9 @@ class ActorNursery:
# modules allowed to invoked funcs from
enable_modules=enable_modules,
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
# verbatim relay this actor's registrar addresses
registry_addrs=current_actor().reg_addrs,
)
parent_addr = self._actor.accept_addr
assert parent_addr
@ -178,7 +177,7 @@ class ActorNursery:
self,
subactor,
self.errors,
bind_addr,
bind_addrs,
parent_addr,
_rtv, # run time vars
infect_asyncio=infect_asyncio,
@ -191,8 +190,8 @@ class ActorNursery:
fn: typing.Callable,
*,
name: Optional[str] = None,
bind_addr: tuple[str, int] = _default_bind_addr,
name: str | None = None,
bind_addrs: tuple[str, int] = [_default_bind_addr],
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
@ -221,7 +220,7 @@ class ActorNursery:
enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or []
),
bind_addr=bind_addr,
bind_addrs=bind_addrs,
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._ria_nursery,