forked from goodboy/tractor
1
0
Fork 0

Port all tests to new `reg_addr` fixture name

multihomed
Tyler Goodlet 2023-10-18 15:39:20 -04:00
parent 6b1ceee19f
commit 0e9457299c
13 changed files with 126 additions and 99 deletions

View File

@ -47,7 +47,7 @@ async def do_nuthin():
],
ids=['no_args', 'unexpected_args'],
)
def test_remote_error(arb_addr, args_err):
def test_remote_error(reg_addr, args_err):
"""Verify an error raised in a subactor that is propagated
to the parent nursery, contains the underlying boxed builtin
error type info and causes cancellation and reraising all the
@ -57,7 +57,7 @@ def test_remote_error(arb_addr, args_err):
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:
# on a remote type error caused by bad input args
@ -97,7 +97,7 @@ def test_remote_error(arb_addr, args_err):
assert exc.type == errtype
def test_multierror(arb_addr):
def test_multierror(reg_addr):
'''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors.
@ -105,7 +105,7 @@ def test_multierror(arb_addr):
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:
await nursery.run_in_actor(assert_err, name='errorer1')
@ -130,14 +130,14 @@ def test_multierror(arb_addr):
@pytest.mark.parametrize(
'num_subactors', range(25, 26),
)
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
"""Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors and also with a delay before failure
to test failure during an ongoing spawning.
"""
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:
for i in range(num_subactors):
@ -175,15 +175,20 @@ async def do_nothing():
@pytest.mark.parametrize('mechanism', ['nursery_cancel', KeyboardInterrupt])
def test_cancel_single_subactor(arb_addr, mechanism):
"""Ensure a ``ActorNursery.start_actor()`` spawned subactor
def test_cancel_single_subactor(reg_addr, mechanism):
'''
Ensure a ``ActorNursery.start_actor()`` spawned subactor
cancels when the nursery is cancelled.
"""
'''
async def spawn_actor():
"""Spawn an actor that blocks indefinitely.
"""
'''
Spawn an actor that blocks indefinitely then cancel via
either `ActorNursery.cancel()` or an exception raise.
'''
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:
portal = await nursery.start_actor(

View File

@ -141,7 +141,7 @@ async def open_actor_local_nursery(
)
def test_actor_managed_trio_nursery_task_error_cancels_aio(
asyncio_mode: bool,
arb_addr
reg_addr: tuple,
):
'''
Verify that a ``trio`` nursery created managed in a child actor

View File

@ -5,7 +5,7 @@ Verify the we raise errors when streams are opened prior to
sync-opening a ``tractor.Context`` beforehand.
'''
from contextlib import asynccontextmanager as acm
# from contextlib import asynccontextmanager as acm
from itertools import count
import platform
from typing import Optional

View File

@ -78,7 +78,7 @@ has_nested_actors = pytest.mark.has_nested_actors
def spawn(
start_method,
testdir,
arb_addr,
reg_addr,
) -> 'pexpect.spawn':
if start_method != 'trio':

View File

@ -15,19 +15,19 @@ from conftest import tractor_test
@tractor_test
async def test_reg_then_unreg(arb_addr):
async def test_reg_then_unreg(reg_addr):
actor = tractor.current_actor()
assert actor.is_arbiter
assert len(actor._registry) == 1 # only self is registered
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as n:
portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid
async with tractor.get_arbiter(*arb_addr) as aportal:
async with tractor.get_arbiter(*reg_addr) as aportal:
# this local actor should be the arbiter
assert actor is aportal.actor
@ -53,15 +53,27 @@ async def hi():
return the_line.format(tractor.current_actor().name)
async def say_hello(other_actor):
async def say_hello(
other_actor: str,
reg_addr: tuple[str, int],
):
await trio.sleep(1) # wait for other actor to spawn
async with tractor.find_actor(other_actor) as portal:
async with tractor.find_actor(
other_actor,
registry_addrs=[reg_addr],
) as portal:
assert portal is not None
return await portal.run(__name__, 'hi')
async def say_hello_use_wait(other_actor):
async with tractor.wait_for_actor(other_actor) as portal:
async def say_hello_use_wait(
other_actor: str,
reg_addr: tuple[str, int],
):
async with tractor.wait_for_actor(
other_actor,
registry_addr=reg_addr,
) as portal:
assert portal is not None
result = await portal.run(__name__, 'hi')
return result
@ -69,21 +81,29 @@ async def say_hello_use_wait(other_actor):
@tractor_test
@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait])
async def test_trynamic_trio(func, start_method, arb_addr):
"""Main tractor entry point, the "master" process (for now
acts as the "director").
"""
async def test_trynamic_trio(
func,
start_method,
reg_addr,
):
'''
Root actor acting as the "director" and running one-shot-task-actors
for the directed subs.
'''
async with tractor.open_nursery() as n:
print("Alright... Action!")
donny = await n.run_in_actor(
func,
other_actor='gretchen',
reg_addr=reg_addr,
name='donny',
)
gretchen = await n.run_in_actor(
func,
other_actor='donny',
reg_addr=reg_addr,
name='gretchen',
)
print(await gretchen.result())
@ -131,7 +151,7 @@ async def unpack_reg(actor_or_portal):
async def spawn_and_check_registry(
arb_addr: tuple,
reg_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
with_streaming: bool = False,
@ -139,9 +159,9 @@ async def spawn_and_check_registry(
) -> None:
async with tractor.open_root_actor(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
):
async with tractor.get_arbiter(*arb_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
# runtime needs to be up to call this
actor = tractor.current_actor()
@ -213,17 +233,19 @@ async def spawn_and_check_registry(
def test_subactors_unregister_on_cancel(
start_method,
use_signal,
arb_addr,
reg_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
'''
Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
spawn_and_check_registry,
arb_addr,
reg_addr,
use_signal,
remote_arbiter=False,
with_streaming=with_streaming,
@ -237,7 +259,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
daemon,
start_method,
use_signal,
arb_addr,
reg_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
@ -248,7 +270,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
trio.run(
partial(
spawn_and_check_registry,
arb_addr,
reg_addr,
use_signal,
remote_arbiter=True,
with_streaming=with_streaming,
@ -262,7 +284,7 @@ async def streamer(agen):
async def close_chans_before_nursery(
arb_addr: tuple,
reg_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
) -> None:
@ -275,9 +297,9 @@ async def close_chans_before_nursery(
entries_at_end = 1
async with tractor.open_root_actor(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
):
async with tractor.get_arbiter(*arb_addr) as aportal:
async with tractor.get_arbiter(*reg_addr) as aportal:
try:
get_reg = partial(unpack_reg, aportal)
@ -329,7 +351,7 @@ async def close_chans_before_nursery(
def test_close_channel_explicit(
start_method,
use_signal,
arb_addr,
reg_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
@ -339,7 +361,7 @@ def test_close_channel_explicit(
trio.run(
partial(
close_chans_before_nursery,
arb_addr,
reg_addr,
use_signal,
remote_arbiter=False,
),
@ -351,7 +373,7 @@ def test_close_channel_explicit_remote_arbiter(
daemon,
start_method,
use_signal,
arb_addr,
reg_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
@ -361,7 +383,7 @@ def test_close_channel_explicit_remote_arbiter(
trio.run(
partial(
close_chans_before_nursery,
arb_addr,
reg_addr,
use_signal,
remote_arbiter=True,
),

View File

@ -47,7 +47,7 @@ async def trio_cancels_single_aio_task():
await tractor.to_asyncio.run_task(sleep_forever)
def test_trio_cancels_aio_on_actor_side(arb_addr):
def test_trio_cancels_aio_on_actor_side(reg_addr):
'''
Spawn an infected actor that is cancelled by the ``trio`` side
task using std cancel scope apis.
@ -55,7 +55,7 @@ def test_trio_cancels_aio_on_actor_side(arb_addr):
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr
registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
trio_cancels_single_aio_task,
@ -94,7 +94,7 @@ async def asyncio_actor(
raise
def test_aio_simple_error(arb_addr):
def test_aio_simple_error(reg_addr):
'''
Verify a simple remote asyncio error propagates back through trio
to the parent actor.
@ -103,7 +103,7 @@ def test_aio_simple_error(arb_addr):
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr
registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
asyncio_actor,
@ -120,7 +120,7 @@ def test_aio_simple_error(arb_addr):
assert err.type == AssertionError
def test_tractor_cancels_aio(arb_addr):
def test_tractor_cancels_aio(reg_addr):
'''
Verify we can cancel a spawned asyncio task gracefully.
@ -139,7 +139,7 @@ def test_tractor_cancels_aio(arb_addr):
trio.run(main)
def test_trio_cancels_aio(arb_addr):
def test_trio_cancels_aio(reg_addr):
'''
Much like the above test with ``tractor.Portal.cancel_actor()``
except we just use a standard ``trio`` cancellation api.
@ -194,7 +194,7 @@ async def trio_ctx(
ids='parent_actor_cancels_child={}'.format
)
def test_context_spawns_aio_task_that_errors(
arb_addr,
reg_addr,
parent_cancels: bool,
):
'''
@ -258,7 +258,7 @@ async def aio_cancel():
await sleep_forever()
def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr):
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
async def main():
async with tractor.open_nursery() as n:
@ -395,7 +395,7 @@ async def stream_from_aio(
'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format
)
def test_basic_interloop_channel_stream(arb_addr, fan_out):
def test_basic_interloop_channel_stream(reg_addr, fan_out):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -409,7 +409,7 @@ def test_basic_interloop_channel_stream(arb_addr, fan_out):
# TODO: parametrize the above test and avoid the duplication here?
def test_trio_error_cancels_intertask_chan(arb_addr):
def test_trio_error_cancels_intertask_chan(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -428,7 +428,7 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
assert exc.type == Exception
def test_trio_closes_early_and_channel_exits(arb_addr):
def test_trio_closes_early_and_channel_exits(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -443,7 +443,7 @@ def test_trio_closes_early_and_channel_exits(arb_addr):
trio.run(main)
def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -520,7 +520,7 @@ async def trio_to_aio_echo_server(
ids='raise_error={}'.format,
)
def test_echoserver_detailed_mechanics(
arb_addr,
reg_addr,
raise_error_mid_stream,
):

View File

@ -55,7 +55,7 @@ async def context_stream(
async def stream_from_single_subactor(
arb_addr,
reg_addr,
start_method,
stream_func,
):
@ -64,7 +64,7 @@ async def stream_from_single_subactor(
# only one per host address, spawns an actor if None
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
start_method=start_method,
) as nursery:
@ -115,13 +115,13 @@ async def stream_from_single_subactor(
@pytest.mark.parametrize(
'stream_func', [async_gen_stream, context_stream]
)
def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
def test_stream_from_single_subactor(reg_addr, start_method, stream_func):
"""Verify streaming from a spawned async generator.
"""
trio.run(
partial(
stream_from_single_subactor,
arb_addr,
reg_addr,
start_method,
stream_func=stream_func,
),
@ -225,14 +225,14 @@ async def a_quadruple_example():
return result_stream
async def cancel_after(wait, arb_addr):
async with tractor.open_root_actor(arbiter_addr=arb_addr):
async def cancel_after(wait, reg_addr):
async with tractor.open_root_actor(registry_addrs=[reg_addr]):
with trio.move_on_after(wait):
return await a_quadruple_example()
@pytest.fixture(scope='module')
def time_quad_ex(arb_addr, ci_env, spawn_backend):
def time_quad_ex(reg_addr, ci_env, spawn_backend):
if spawn_backend == 'mp':
"""no idea but the mp *nix runs are flaking out here often...
"""
@ -240,7 +240,7 @@ def time_quad_ex(arb_addr, ci_env, spawn_backend):
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
start = time.time()
results = trio.run(cancel_after, timeout, arb_addr)
results = trio.run(cancel_after, timeout, reg_addr)
diff = time.time() - start
assert results
return results, diff
@ -260,14 +260,14 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
list(map(lambda i: i/10, range(3, 9)))
)
def test_not_fast_enough_quad(
arb_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
):
"""Verify we can cancel midway through the quad example and all actors
cancel gracefully.
"""
results, diff = time_quad_ex
delay = max(diff - cancel_delay, 0)
results = trio.run(cancel_after, delay, arb_addr)
results = trio.run(cancel_after, delay, reg_addr)
system = platform.system()
if system in ('Windows', 'Darwin') and results is not None:
# In CI envoirments it seems later runs are quicker then the first
@ -280,7 +280,7 @@ def test_not_fast_enough_quad(
@tractor_test
async def test_respawn_consumer_task(
arb_addr,
reg_addr,
spawn_backend,
loglevel,
):

View File

@ -24,7 +24,7 @@ async def test_no_runtime():
@tractor_test
async def test_self_is_registered(arb_addr):
async def test_self_is_registered(reg_addr):
"Verify waiting on the arbiter to register itself using the standard api."
actor = tractor.current_actor()
assert actor.is_arbiter
@ -34,20 +34,20 @@ async def test_self_is_registered(arb_addr):
@tractor_test
async def test_self_is_registered_localportal(arb_addr):
async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal."
actor = tractor.current_actor()
assert actor.is_arbiter
async with tractor.get_arbiter(*arb_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal)
with trio.fail_after(0.2):
sockaddr = await portal.run_from_ns(
'self', 'wait_for_actor', name='root')
assert sockaddr[0] == arb_addr
assert sockaddr[0] == reg_addr
def test_local_actor_async_func(arb_addr):
def test_local_actor_async_func(reg_addr):
"""Verify a simple async function in-process.
"""
nums = []
@ -55,7 +55,7 @@ def test_local_actor_async_func(arb_addr):
async def print_loop():
async with tractor.open_root_actor(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
):
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter

View File

@ -28,9 +28,9 @@ def test_abort_on_sigint(daemon):
@tractor_test
async def test_cancel_remote_arbiter(daemon, arb_addr):
async def test_cancel_remote_arbiter(daemon, reg_addr):
assert not tractor.current_actor().is_arbiter
async with tractor.get_arbiter(*arb_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
await portal.cancel_actor()
time.sleep(0.1)
@ -39,16 +39,16 @@ async def test_cancel_remote_arbiter(daemon, arb_addr):
# no arbiter socket should exist
with pytest.raises(OSError):
async with tractor.get_arbiter(*arb_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
pass
def test_register_duplicate_name(daemon, arb_addr):
def test_register_duplicate_name(daemon, reg_addr):
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as n:
assert not tractor.current_actor().is_arbiter

View File

@ -160,7 +160,7 @@ async def test_required_args(callwith_expecterror):
)
def test_multi_actor_subs_arbiter_pub(
loglevel,
arb_addr,
reg_addr,
pub_actor,
):
"""Try out the neato @pub decorator system.
@ -170,7 +170,7 @@ def test_multi_actor_subs_arbiter_pub(
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
enable_modules=[__name__],
) as n:
@ -255,12 +255,12 @@ def test_multi_actor_subs_arbiter_pub(
def test_single_subactor_pub_multitask_subs(
loglevel,
arb_addr,
reg_addr,
):
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
enable_modules=[__name__],
) as n:

View File

@ -45,7 +45,7 @@ async def short_sleep():
ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import',
'fail_on_syntax'],
)
def test_rpc_errors(arb_addr, to_call, testdir):
def test_rpc_errors(reg_addr, to_call, testdir):
"""Test errors when making various RPC requests to an actor
that either doesn't have the requested module exposed or doesn't define
the named function.
@ -77,7 +77,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
# spawn a subactor which calls us back
async with tractor.open_nursery(
arbiter_addr=arb_addr,
arbiter_addr=reg_addr,
enable_modules=exposed_mods.copy(),
) as n:

View File

@ -16,14 +16,14 @@ data_to_pass_down = {'doggy': 10, 'kitty': 4}
async def spawn(
is_arbiter: bool,
data: dict,
arb_addr: tuple[str, int],
reg_addr: tuple[str, int],
):
namespaces = [__name__]
await trio.sleep(0.1)
async with tractor.open_root_actor(
arbiter_addr=arb_addr,
arbiter_addr=reg_addr,
):
actor = tractor.current_actor()
@ -41,7 +41,7 @@ async def spawn(
is_arbiter=False,
name='sub-actor',
data=data,
arb_addr=arb_addr,
reg_addr=reg_addr,
enable_modules=namespaces,
)
@ -55,12 +55,12 @@ async def spawn(
return 10
def test_local_arbiter_subactor_global_state(arb_addr):
def test_local_arbiter_subactor_global_state(reg_addr):
result = trio.run(
spawn,
True,
data_to_pass_down,
arb_addr,
reg_addr,
)
assert result == 10
@ -140,7 +140,7 @@ async def check_loglevel(level):
def test_loglevel_propagated_to_subactor(
start_method,
capfd,
arb_addr,
reg_addr,
):
if start_method == 'mp_forkserver':
pytest.skip(
@ -152,7 +152,7 @@ def test_loglevel_propagated_to_subactor(
async with tractor.open_nursery(
name='arbiter',
start_method=start_method,
arbiter_addr=arb_addr,
arbiter_addr=reg_addr,
) as tn:
await tn.run_in_actor(

View File

@ -66,13 +66,13 @@ async def ensure_sequence(
async def open_sequence_streamer(
sequence: list[int],
arb_addr: tuple[str, int],
reg_addr: tuple[str, int],
start_method: str,
) -> tractor.MsgStream:
async with tractor.open_nursery(
arbiter_addr=arb_addr,
arbiter_addr=reg_addr,
start_method=start_method,
) as tn:
@ -93,7 +93,7 @@ async def open_sequence_streamer(
def test_stream_fan_out_to_local_subscriptions(
arb_addr,
reg_addr,
start_method,
):
@ -103,7 +103,7 @@ def test_stream_fan_out_to_local_subscriptions(
async with open_sequence_streamer(
sequence,
arb_addr,
reg_addr,
start_method,
) as stream:
@ -138,7 +138,7 @@ def test_stream_fan_out_to_local_subscriptions(
]
)
def test_consumer_and_parent_maybe_lag(
arb_addr,
reg_addr,
start_method,
task_delays,
):
@ -150,7 +150,7 @@ def test_consumer_and_parent_maybe_lag(
async with open_sequence_streamer(
sequence,
arb_addr,
reg_addr,
start_method,
) as stream:
@ -211,7 +211,7 @@ def test_consumer_and_parent_maybe_lag(
def test_faster_task_to_recv_is_cancelled_by_slower(
arb_addr,
reg_addr,
start_method,
):
'''
@ -225,7 +225,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
async with open_sequence_streamer(
sequence,
arb_addr,
reg_addr,
start_method,
) as stream:
@ -302,7 +302,7 @@ def test_subscribe_errors_after_close():
def test_ensure_slow_consumers_lag_out(
arb_addr,
reg_addr,
start_method,
):
'''This is a pure local task test; no tractor