From 0e9457299c45097b71712625b44a9177aa83fd61 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 18 Oct 2023 15:39:20 -0400 Subject: [PATCH] Port all tests to new `reg_addr` fixture name --- tests/test_cancellation.py | 29 ++++---- tests/test_child_manages_service_nursery.py | 2 +- tests/test_context_stream_semantics.py | 2 +- tests/test_debugger.py | 2 +- tests/test_discovery.py | 76 +++++++++++++-------- tests/test_infected_asyncio.py | 26 +++---- tests/test_legacy_one_way_streaming.py | 22 +++--- tests/test_local.py | 12 ++-- tests/test_multi_program.py | 10 +-- tests/test_pubsub.py | 8 +-- tests/test_rpc.py | 4 +- tests/test_spawning.py | 14 ++-- tests/test_task_broadcasting.py | 18 ++--- 13 files changed, 126 insertions(+), 99 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 657ab8e..ce396ac 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -47,7 +47,7 @@ async def do_nuthin(): ], ids=['no_args', 'unexpected_args'], ) -def test_remote_error(arb_addr, args_err): +def test_remote_error(reg_addr, args_err): """Verify an error raised in a subactor that is propagated to the parent nursery, contains the underlying boxed builtin error type info and causes cancellation and reraising all the @@ -57,7 +57,7 @@ def test_remote_error(arb_addr, args_err): async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ) as nursery: # on a remote type error caused by bad input args @@ -97,7 +97,7 @@ def test_remote_error(arb_addr, args_err): assert exc.type == errtype -def test_multierror(arb_addr): +def test_multierror(reg_addr): ''' Verify we raise a ``BaseExceptionGroup`` out of a nursery where more then one actor errors. @@ -105,7 +105,7 @@ def test_multierror(arb_addr): ''' async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ) as nursery: await nursery.run_in_actor(assert_err, name='errorer1') @@ -130,14 +130,14 @@ def test_multierror(arb_addr): @pytest.mark.parametrize( 'num_subactors', range(25, 26), ) -def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): +def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay): """Verify we raise a ``BaseExceptionGroup`` out of a nursery where more then one actor errors and also with a delay before failure to test failure during an ongoing spawning. """ async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ) as nursery: for i in range(num_subactors): @@ -175,15 +175,20 @@ async def do_nothing(): @pytest.mark.parametrize('mechanism', ['nursery_cancel', KeyboardInterrupt]) -def test_cancel_single_subactor(arb_addr, mechanism): - """Ensure a ``ActorNursery.start_actor()`` spawned subactor +def test_cancel_single_subactor(reg_addr, mechanism): + ''' + Ensure a ``ActorNursery.start_actor()`` spawned subactor cancels when the nursery is cancelled. - """ + + ''' async def spawn_actor(): - """Spawn an actor that blocks indefinitely. - """ + ''' + Spawn an actor that blocks indefinitely then cancel via + either `ActorNursery.cancel()` or an exception raise. + + ''' async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ) as nursery: portal = await nursery.start_actor( diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py index 806e6d7..fd1ceb8 100644 --- a/tests/test_child_manages_service_nursery.py +++ b/tests/test_child_manages_service_nursery.py @@ -141,7 +141,7 @@ async def open_actor_local_nursery( ) def test_actor_managed_trio_nursery_task_error_cancels_aio( asyncio_mode: bool, - arb_addr + reg_addr: tuple, ): ''' Verify that a ``trio`` nursery created managed in a child actor diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index a0d291d..29d50e8 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -5,7 +5,7 @@ Verify the we raise errors when streams are opened prior to sync-opening a ``tractor.Context`` beforehand. ''' -from contextlib import asynccontextmanager as acm +# from contextlib import asynccontextmanager as acm from itertools import count import platform from typing import Optional diff --git a/tests/test_debugger.py b/tests/test_debugger.py index a44a313..e7bb0d7 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -78,7 +78,7 @@ has_nested_actors = pytest.mark.has_nested_actors def spawn( start_method, testdir, - arb_addr, + reg_addr, ) -> 'pexpect.spawn': if start_method != 'trio': diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 8ba4ebe..8b47700 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -15,19 +15,19 @@ from conftest import tractor_test @tractor_test -async def test_reg_then_unreg(arb_addr): +async def test_reg_then_unreg(reg_addr): actor = tractor.current_actor() assert actor.is_arbiter assert len(actor._registry) == 1 # only self is registered async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ) as n: portal = await n.start_actor('actor', enable_modules=[__name__]) uid = portal.channel.uid - async with tractor.get_arbiter(*arb_addr) as aportal: + async with tractor.get_arbiter(*reg_addr) as aportal: # this local actor should be the arbiter assert actor is aportal.actor @@ -53,15 +53,27 @@ async def hi(): return the_line.format(tractor.current_actor().name) -async def say_hello(other_actor): +async def say_hello( + other_actor: str, + reg_addr: tuple[str, int], +): await trio.sleep(1) # wait for other actor to spawn - async with tractor.find_actor(other_actor) as portal: + async with tractor.find_actor( + other_actor, + registry_addrs=[reg_addr], + ) as portal: assert portal is not None return await portal.run(__name__, 'hi') -async def say_hello_use_wait(other_actor): - async with tractor.wait_for_actor(other_actor) as portal: +async def say_hello_use_wait( + other_actor: str, + reg_addr: tuple[str, int], +): + async with tractor.wait_for_actor( + other_actor, + registry_addr=reg_addr, + ) as portal: assert portal is not None result = await portal.run(__name__, 'hi') return result @@ -69,21 +81,29 @@ async def say_hello_use_wait(other_actor): @tractor_test @pytest.mark.parametrize('func', [say_hello, say_hello_use_wait]) -async def test_trynamic_trio(func, start_method, arb_addr): - """Main tractor entry point, the "master" process (for now - acts as the "director"). - """ +async def test_trynamic_trio( + func, + start_method, + reg_addr, +): + ''' + Root actor acting as the "director" and running one-shot-task-actors + for the directed subs. + + ''' async with tractor.open_nursery() as n: print("Alright... Action!") donny = await n.run_in_actor( func, other_actor='gretchen', + reg_addr=reg_addr, name='donny', ) gretchen = await n.run_in_actor( func, other_actor='donny', + reg_addr=reg_addr, name='gretchen', ) print(await gretchen.result()) @@ -131,7 +151,7 @@ async def unpack_reg(actor_or_portal): async def spawn_and_check_registry( - arb_addr: tuple, + reg_addr: tuple, use_signal: bool, remote_arbiter: bool = False, with_streaming: bool = False, @@ -139,9 +159,9 @@ async def spawn_and_check_registry( ) -> None: async with tractor.open_root_actor( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ): - async with tractor.get_arbiter(*arb_addr) as portal: + async with tractor.get_arbiter(*reg_addr) as portal: # runtime needs to be up to call this actor = tractor.current_actor() @@ -213,17 +233,19 @@ async def spawn_and_check_registry( def test_subactors_unregister_on_cancel( start_method, use_signal, - arb_addr, + reg_addr, with_streaming, ): - """Verify that cancelling a nursery results in all subactors + ''' + Verify that cancelling a nursery results in all subactors deregistering themselves with the arbiter. - """ + + ''' with pytest.raises(KeyboardInterrupt): trio.run( partial( spawn_and_check_registry, - arb_addr, + reg_addr, use_signal, remote_arbiter=False, with_streaming=with_streaming, @@ -237,7 +259,7 @@ def test_subactors_unregister_on_cancel_remote_daemon( daemon, start_method, use_signal, - arb_addr, + reg_addr, with_streaming, ): """Verify that cancelling a nursery results in all subactors @@ -248,7 +270,7 @@ def test_subactors_unregister_on_cancel_remote_daemon( trio.run( partial( spawn_and_check_registry, - arb_addr, + reg_addr, use_signal, remote_arbiter=True, with_streaming=with_streaming, @@ -262,7 +284,7 @@ async def streamer(agen): async def close_chans_before_nursery( - arb_addr: tuple, + reg_addr: tuple, use_signal: bool, remote_arbiter: bool = False, ) -> None: @@ -275,9 +297,9 @@ async def close_chans_before_nursery( entries_at_end = 1 async with tractor.open_root_actor( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ): - async with tractor.get_arbiter(*arb_addr) as aportal: + async with tractor.get_arbiter(*reg_addr) as aportal: try: get_reg = partial(unpack_reg, aportal) @@ -329,7 +351,7 @@ async def close_chans_before_nursery( def test_close_channel_explicit( start_method, use_signal, - arb_addr, + reg_addr, ): """Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also @@ -339,7 +361,7 @@ def test_close_channel_explicit( trio.run( partial( close_chans_before_nursery, - arb_addr, + reg_addr, use_signal, remote_arbiter=False, ), @@ -351,7 +373,7 @@ def test_close_channel_explicit_remote_arbiter( daemon, start_method, use_signal, - arb_addr, + reg_addr, ): """Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also @@ -361,7 +383,7 @@ def test_close_channel_explicit_remote_arbiter( trio.run( partial( close_chans_before_nursery, - arb_addr, + reg_addr, use_signal, remote_arbiter=True, ), diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index dd9d681..7674419 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -47,7 +47,7 @@ async def trio_cancels_single_aio_task(): await tractor.to_asyncio.run_task(sleep_forever) -def test_trio_cancels_aio_on_actor_side(arb_addr): +def test_trio_cancels_aio_on_actor_side(reg_addr): ''' Spawn an infected actor that is cancelled by the ``trio`` side task using std cancel scope apis. @@ -55,7 +55,7 @@ def test_trio_cancels_aio_on_actor_side(arb_addr): ''' async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr + registry_addrs=[reg_addr] ) as n: await n.run_in_actor( trio_cancels_single_aio_task, @@ -94,7 +94,7 @@ async def asyncio_actor( raise -def test_aio_simple_error(arb_addr): +def test_aio_simple_error(reg_addr): ''' Verify a simple remote asyncio error propagates back through trio to the parent actor. @@ -103,7 +103,7 @@ def test_aio_simple_error(arb_addr): ''' async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr + registry_addrs=[reg_addr] ) as n: await n.run_in_actor( asyncio_actor, @@ -120,7 +120,7 @@ def test_aio_simple_error(arb_addr): assert err.type == AssertionError -def test_tractor_cancels_aio(arb_addr): +def test_tractor_cancels_aio(reg_addr): ''' Verify we can cancel a spawned asyncio task gracefully. @@ -139,7 +139,7 @@ def test_tractor_cancels_aio(arb_addr): trio.run(main) -def test_trio_cancels_aio(arb_addr): +def test_trio_cancels_aio(reg_addr): ''' Much like the above test with ``tractor.Portal.cancel_actor()`` except we just use a standard ``trio`` cancellation api. @@ -194,7 +194,7 @@ async def trio_ctx( ids='parent_actor_cancels_child={}'.format ) def test_context_spawns_aio_task_that_errors( - arb_addr, + reg_addr, parent_cancels: bool, ): ''' @@ -258,7 +258,7 @@ async def aio_cancel(): await sleep_forever() -def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): +def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): async def main(): async with tractor.open_nursery() as n: @@ -395,7 +395,7 @@ async def stream_from_aio( 'fan_out', [False, True], ids='fan_out_w_chan_subscribe={}'.format ) -def test_basic_interloop_channel_stream(arb_addr, fan_out): +def test_basic_interloop_channel_stream(reg_addr, fan_out): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -409,7 +409,7 @@ def test_basic_interloop_channel_stream(arb_addr, fan_out): # TODO: parametrize the above test and avoid the duplication here? -def test_trio_error_cancels_intertask_chan(arb_addr): +def test_trio_error_cancels_intertask_chan(reg_addr): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -428,7 +428,7 @@ def test_trio_error_cancels_intertask_chan(arb_addr): assert exc.type == Exception -def test_trio_closes_early_and_channel_exits(arb_addr): +def test_trio_closes_early_and_channel_exits(reg_addr): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -443,7 +443,7 @@ def test_trio_closes_early_and_channel_exits(arb_addr): trio.run(main) -def test_aio_errors_and_channel_propagates_and_closes(arb_addr): +def test_aio_errors_and_channel_propagates_and_closes(reg_addr): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -520,7 +520,7 @@ async def trio_to_aio_echo_server( ids='raise_error={}'.format, ) def test_echoserver_detailed_mechanics( - arb_addr, + reg_addr, raise_error_mid_stream, ): diff --git a/tests/test_legacy_one_way_streaming.py b/tests/test_legacy_one_way_streaming.py index 17e94ba..0cbda4d 100644 --- a/tests/test_legacy_one_way_streaming.py +++ b/tests/test_legacy_one_way_streaming.py @@ -55,7 +55,7 @@ async def context_stream( async def stream_from_single_subactor( - arb_addr, + reg_addr, start_method, stream_func, ): @@ -64,7 +64,7 @@ async def stream_from_single_subactor( # only one per host address, spawns an actor if None async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], start_method=start_method, ) as nursery: @@ -115,13 +115,13 @@ async def stream_from_single_subactor( @pytest.mark.parametrize( 'stream_func', [async_gen_stream, context_stream] ) -def test_stream_from_single_subactor(arb_addr, start_method, stream_func): +def test_stream_from_single_subactor(reg_addr, start_method, stream_func): """Verify streaming from a spawned async generator. """ trio.run( partial( stream_from_single_subactor, - arb_addr, + reg_addr, start_method, stream_func=stream_func, ), @@ -225,14 +225,14 @@ async def a_quadruple_example(): return result_stream -async def cancel_after(wait, arb_addr): - async with tractor.open_root_actor(arbiter_addr=arb_addr): +async def cancel_after(wait, reg_addr): + async with tractor.open_root_actor(registry_addrs=[reg_addr]): with trio.move_on_after(wait): return await a_quadruple_example() @pytest.fixture(scope='module') -def time_quad_ex(arb_addr, ci_env, spawn_backend): +def time_quad_ex(reg_addr, ci_env, spawn_backend): if spawn_backend == 'mp': """no idea but the mp *nix runs are flaking out here often... """ @@ -240,7 +240,7 @@ def time_quad_ex(arb_addr, ci_env, spawn_backend): timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4 start = time.time() - results = trio.run(cancel_after, timeout, arb_addr) + results = trio.run(cancel_after, timeout, reg_addr) diff = time.time() - start assert results return results, diff @@ -260,14 +260,14 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend): list(map(lambda i: i/10, range(3, 9))) ) def test_not_fast_enough_quad( - arb_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend + reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend ): """Verify we can cancel midway through the quad example and all actors cancel gracefully. """ results, diff = time_quad_ex delay = max(diff - cancel_delay, 0) - results = trio.run(cancel_after, delay, arb_addr) + results = trio.run(cancel_after, delay, reg_addr) system = platform.system() if system in ('Windows', 'Darwin') and results is not None: # In CI envoirments it seems later runs are quicker then the first @@ -280,7 +280,7 @@ def test_not_fast_enough_quad( @tractor_test async def test_respawn_consumer_task( - arb_addr, + reg_addr, spawn_backend, loglevel, ): diff --git a/tests/test_local.py b/tests/test_local.py index 97a8328..009d0d7 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -24,7 +24,7 @@ async def test_no_runtime(): @tractor_test -async def test_self_is_registered(arb_addr): +async def test_self_is_registered(reg_addr): "Verify waiting on the arbiter to register itself using the standard api." actor = tractor.current_actor() assert actor.is_arbiter @@ -34,20 +34,20 @@ async def test_self_is_registered(arb_addr): @tractor_test -async def test_self_is_registered_localportal(arb_addr): +async def test_self_is_registered_localportal(reg_addr): "Verify waiting on the arbiter to register itself using a local portal." actor = tractor.current_actor() assert actor.is_arbiter - async with tractor.get_arbiter(*arb_addr) as portal: + async with tractor.get_arbiter(*reg_addr) as portal: assert isinstance(portal, tractor._portal.LocalPortal) with trio.fail_after(0.2): sockaddr = await portal.run_from_ns( 'self', 'wait_for_actor', name='root') - assert sockaddr[0] == arb_addr + assert sockaddr[0] == reg_addr -def test_local_actor_async_func(arb_addr): +def test_local_actor_async_func(reg_addr): """Verify a simple async function in-process. """ nums = [] @@ -55,7 +55,7 @@ def test_local_actor_async_func(arb_addr): async def print_loop(): async with tractor.open_root_actor( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ): # arbiter is started in-proc if dne assert tractor.current_actor().is_arbiter diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index e7a3ac5..d1ee0f5 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -28,9 +28,9 @@ def test_abort_on_sigint(daemon): @tractor_test -async def test_cancel_remote_arbiter(daemon, arb_addr): +async def test_cancel_remote_arbiter(daemon, reg_addr): assert not tractor.current_actor().is_arbiter - async with tractor.get_arbiter(*arb_addr) as portal: + async with tractor.get_arbiter(*reg_addr) as portal: await portal.cancel_actor() time.sleep(0.1) @@ -39,16 +39,16 @@ async def test_cancel_remote_arbiter(daemon, arb_addr): # no arbiter socket should exist with pytest.raises(OSError): - async with tractor.get_arbiter(*arb_addr) as portal: + async with tractor.get_arbiter(*reg_addr) as portal: pass -def test_register_duplicate_name(daemon, arb_addr): +def test_register_duplicate_name(daemon, reg_addr): async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], ) as n: assert not tractor.current_actor().is_arbiter diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index ababcb5..20554fa 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -160,7 +160,7 @@ async def test_required_args(callwith_expecterror): ) def test_multi_actor_subs_arbiter_pub( loglevel, - arb_addr, + reg_addr, pub_actor, ): """Try out the neato @pub decorator system. @@ -170,7 +170,7 @@ def test_multi_actor_subs_arbiter_pub( async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], enable_modules=[__name__], ) as n: @@ -255,12 +255,12 @@ def test_multi_actor_subs_arbiter_pub( def test_single_subactor_pub_multitask_subs( loglevel, - arb_addr, + reg_addr, ): async def main(): async with tractor.open_nursery( - arbiter_addr=arb_addr, + registry_addrs=[reg_addr], enable_modules=[__name__], ) as n: diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 6d15896..7ede231 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -45,7 +45,7 @@ async def short_sleep(): ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import', 'fail_on_syntax'], ) -def test_rpc_errors(arb_addr, to_call, testdir): +def test_rpc_errors(reg_addr, to_call, testdir): """Test errors when making various RPC requests to an actor that either doesn't have the requested module exposed or doesn't define the named function. @@ -77,7 +77,7 @@ def test_rpc_errors(arb_addr, to_call, testdir): # spawn a subactor which calls us back async with tractor.open_nursery( - arbiter_addr=arb_addr, + arbiter_addr=reg_addr, enable_modules=exposed_mods.copy(), ) as n: diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 17798c0..0f6a8cf 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -16,14 +16,14 @@ data_to_pass_down = {'doggy': 10, 'kitty': 4} async def spawn( is_arbiter: bool, data: dict, - arb_addr: tuple[str, int], + reg_addr: tuple[str, int], ): namespaces = [__name__] await trio.sleep(0.1) async with tractor.open_root_actor( - arbiter_addr=arb_addr, + arbiter_addr=reg_addr, ): actor = tractor.current_actor() @@ -41,7 +41,7 @@ async def spawn( is_arbiter=False, name='sub-actor', data=data, - arb_addr=arb_addr, + reg_addr=reg_addr, enable_modules=namespaces, ) @@ -55,12 +55,12 @@ async def spawn( return 10 -def test_local_arbiter_subactor_global_state(arb_addr): +def test_local_arbiter_subactor_global_state(reg_addr): result = trio.run( spawn, True, data_to_pass_down, - arb_addr, + reg_addr, ) assert result == 10 @@ -140,7 +140,7 @@ async def check_loglevel(level): def test_loglevel_propagated_to_subactor( start_method, capfd, - arb_addr, + reg_addr, ): if start_method == 'mp_forkserver': pytest.skip( @@ -152,7 +152,7 @@ def test_loglevel_propagated_to_subactor( async with tractor.open_nursery( name='arbiter', start_method=start_method, - arbiter_addr=arb_addr, + arbiter_addr=reg_addr, ) as tn: await tn.run_in_actor( diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 5e18e10..d7a2913 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -66,13 +66,13 @@ async def ensure_sequence( async def open_sequence_streamer( sequence: list[int], - arb_addr: tuple[str, int], + reg_addr: tuple[str, int], start_method: str, ) -> tractor.MsgStream: async with tractor.open_nursery( - arbiter_addr=arb_addr, + arbiter_addr=reg_addr, start_method=start_method, ) as tn: @@ -93,7 +93,7 @@ async def open_sequence_streamer( def test_stream_fan_out_to_local_subscriptions( - arb_addr, + reg_addr, start_method, ): @@ -103,7 +103,7 @@ def test_stream_fan_out_to_local_subscriptions( async with open_sequence_streamer( sequence, - arb_addr, + reg_addr, start_method, ) as stream: @@ -138,7 +138,7 @@ def test_stream_fan_out_to_local_subscriptions( ] ) def test_consumer_and_parent_maybe_lag( - arb_addr, + reg_addr, start_method, task_delays, ): @@ -150,7 +150,7 @@ def test_consumer_and_parent_maybe_lag( async with open_sequence_streamer( sequence, - arb_addr, + reg_addr, start_method, ) as stream: @@ -211,7 +211,7 @@ def test_consumer_and_parent_maybe_lag( def test_faster_task_to_recv_is_cancelled_by_slower( - arb_addr, + reg_addr, start_method, ): ''' @@ -225,7 +225,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower( async with open_sequence_streamer( sequence, - arb_addr, + reg_addr, start_method, ) as stream: @@ -302,7 +302,7 @@ def test_subscribe_errors_after_close(): def test_ensure_slow_consumers_lag_out( - arb_addr, + reg_addr, start_method, ): '''This is a pure local task test; no tractor