From bc13599e1f38cefb5f936bc22311aa62925747f6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 18 Mar 2025 21:44:50 -0400 Subject: [PATCH] Revert "Port all tests to new `reg_addr` fixture name" This reverts commit 715348c5c2d2d0ec793fb2dc47479c38b8a54c49. --- tests/test_cancellation.py | 31 ++++----- tests/test_child_manages_service_nursery.py | 2 +- tests/test_debugger.py | 2 +- tests/test_discovery.py | 76 ++++++++------------- tests/test_infected_asyncio.py | 26 +++---- tests/test_legacy_one_way_streaming.py | 22 +++--- tests/test_local.py | 12 ++-- tests/test_multi_program.py | 10 +-- tests/test_pubsub.py | 8 +-- tests/test_rpc.py | 4 +- tests/test_spawning.py | 14 ++-- tests/test_task_broadcasting.py | 18 ++--- 12 files changed, 99 insertions(+), 126 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index b8c14af3..14e4d0ae 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -14,7 +14,7 @@ import tractor from tractor._testing import ( tractor_test, ) -from conftest import no_windows +from .conftest import no_windows def is_win(): @@ -45,7 +45,7 @@ async def do_nuthin(): ], ids=['no_args', 'unexpected_args'], ) -def test_remote_error(reg_addr, args_err): +def test_remote_error(arb_addr, args_err): ''' Verify an error raised in a subactor that is propagated to the parent nursery, contains the underlying boxed builtin @@ -57,7 +57,7 @@ def test_remote_error(reg_addr, args_err): async def main(): async with tractor.open_nursery( - registry_addrs=[reg_addr], + arbiter_addr=arb_addr, ) as nursery: # on a remote type error caused by bad input args @@ -99,7 +99,7 @@ def test_remote_error(reg_addr, args_err): assert exc.type == errtype -def test_multierror(reg_addr): +def test_multierror(arb_addr): ''' Verify we raise a ``BaseExceptionGroup`` out of a nursery where more then one actor errors. @@ -107,7 +107,7 @@ def test_multierror(reg_addr): ''' async def main(): async with tractor.open_nursery( - registry_addrs=[reg_addr], + arbiter_addr=arb_addr, ) as nursery: await nursery.run_in_actor(assert_err, name='errorer1') @@ -132,14 +132,14 @@ def test_multierror(reg_addr): @pytest.mark.parametrize( 'num_subactors', range(25, 26), ) -def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay): +def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): """Verify we raise a ``BaseExceptionGroup`` out of a nursery where more then one actor errors and also with a delay before failure to test failure during an ongoing spawning. """ async def main(): async with tractor.open_nursery( - registry_addrs=[reg_addr], + arbiter_addr=arb_addr, ) as nursery: for i in range(num_subactors): @@ -177,20 +177,15 @@ async def do_nothing(): @pytest.mark.parametrize('mechanism', ['nursery_cancel', KeyboardInterrupt]) -def test_cancel_single_subactor(reg_addr, mechanism): - ''' - Ensure a ``ActorNursery.start_actor()`` spawned subactor +def test_cancel_single_subactor(arb_addr, mechanism): + """Ensure a ``ActorNursery.start_actor()`` spawned subactor cancels when the nursery is cancelled. - - ''' + """ async def spawn_actor(): - ''' - Spawn an actor that blocks indefinitely then cancel via - either `ActorNursery.cancel()` or an exception raise. - - ''' + """Spawn an actor that blocks indefinitely. + """ async with tractor.open_nursery( - registry_addrs=[reg_addr], + arbiter_addr=arb_addr, ) as nursery: portal = await nursery.start_actor( diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py index 350f939b..1dcbe031 100644 --- a/tests/test_child_manages_service_nursery.py +++ b/tests/test_child_manages_service_nursery.py @@ -142,7 +142,7 @@ async def open_actor_local_nursery( ) def test_actor_managed_trio_nursery_task_error_cancels_aio( asyncio_mode: bool, - reg_addr: tuple, + arb_addr ): ''' Verify that a ``trio`` nursery created managed in a child actor diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 25f6dad2..889e7c74 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -78,7 +78,7 @@ has_nested_actors = pytest.mark.has_nested_actors def spawn( start_method, testdir, - reg_addr, + arb_addr, ) -> 'pexpect.spawn': if start_method != 'trio': diff --git a/tests/test_discovery.py b/tests/test_discovery.py index cd9dc022..b56c3a2e 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -14,19 +14,19 @@ import trio @tractor_test -async def test_reg_then_unreg(reg_addr): +async def test_reg_then_unreg(arb_addr): actor = tractor.current_actor() assert actor.is_arbiter assert len(actor._registry) == 1 # only self is registered async with tractor.open_nursery( - registry_addrs=[reg_addr], + arbiter_addr=arb_addr, ) as n: portal = await n.start_actor('actor', enable_modules=[__name__]) uid = portal.channel.uid - async with tractor.get_arbiter(*reg_addr) as aportal: + async with tractor.get_arbiter(*arb_addr) as aportal: # this local actor should be the arbiter assert actor is aportal.actor @@ -52,27 +52,15 @@ async def hi(): return the_line.format(tractor.current_actor().name) -async def say_hello( - other_actor: str, - reg_addr: tuple[str, int], -): +async def say_hello(other_actor): await trio.sleep(1) # wait for other actor to spawn - async with tractor.find_actor( - other_actor, - registry_addrs=[reg_addr], - ) as portal: + async with tractor.find_actor(other_actor) as portal: assert portal is not None return await portal.run(__name__, 'hi') -async def say_hello_use_wait( - other_actor: str, - reg_addr: tuple[str, int], -): - async with tractor.wait_for_actor( - other_actor, - registry_addr=reg_addr, - ) as portal: +async def say_hello_use_wait(other_actor): + async with tractor.wait_for_actor(other_actor) as portal: assert portal is not None result = await portal.run(__name__, 'hi') return result @@ -80,29 +68,21 @@ async def say_hello_use_wait( @tractor_test @pytest.mark.parametrize('func', [say_hello, say_hello_use_wait]) -async def test_trynamic_trio( - func, - start_method, - reg_addr, -): - ''' - Root actor acting as the "director" and running one-shot-task-actors - for the directed subs. - - ''' +async def test_trynamic_trio(func, start_method, arb_addr): + """Main tractor entry point, the "master" process (for now + acts as the "director"). + """ async with tractor.open_nursery() as n: print("Alright... Action!") donny = await n.run_in_actor( func, other_actor='gretchen', - reg_addr=reg_addr, name='donny', ) gretchen = await n.run_in_actor( func, other_actor='donny', - reg_addr=reg_addr, name='gretchen', ) print(await gretchen.result()) @@ -150,7 +130,7 @@ async def unpack_reg(actor_or_portal): async def spawn_and_check_registry( - reg_addr: tuple, + arb_addr: tuple, use_signal: bool, remote_arbiter: bool = False, with_streaming: bool = False, @@ -158,9 +138,9 @@ async def spawn_and_check_registry( ) -> None: async with tractor.open_root_actor( - registry_addrs=[reg_addr], + arbiter_addr=arb_addr, ): - async with tractor.get_arbiter(*reg_addr) as portal: + async with tractor.get_arbiter(*arb_addr) as portal: # runtime needs to be up to call this actor = tractor.current_actor() @@ -232,19 +212,17 @@ async def spawn_and_check_registry( def test_subactors_unregister_on_cancel( start_method, use_signal, - reg_addr, + arb_addr, with_streaming, ): - ''' - Verify that cancelling a nursery results in all subactors + """Verify that cancelling a nursery results in all subactors deregistering themselves with the arbiter. - - ''' + """ with pytest.raises(KeyboardInterrupt): trio.run( partial( spawn_and_check_registry, - reg_addr, + arb_addr, use_signal, remote_arbiter=False, with_streaming=with_streaming, @@ -258,7 +236,7 @@ def test_subactors_unregister_on_cancel_remote_daemon( daemon, start_method, use_signal, - reg_addr, + arb_addr, with_streaming, ): """Verify that cancelling a nursery results in all subactors @@ -269,7 +247,7 @@ def test_subactors_unregister_on_cancel_remote_daemon( trio.run( partial( spawn_and_check_registry, - reg_addr, + arb_addr, use_signal, remote_arbiter=True, with_streaming=with_streaming, @@ -283,7 +261,7 @@ async def streamer(agen): async def close_chans_before_nursery( - reg_addr: tuple, + arb_addr: tuple, use_signal: bool, remote_arbiter: bool = False, ) -> None: @@ -296,9 +274,9 @@ async def close_chans_before_nursery( entries_at_end = 1 async with tractor.open_root_actor( - registry_addrs=[reg_addr], + arbiter_addr=arb_addr, ): - async with tractor.get_arbiter(*reg_addr) as aportal: + async with tractor.get_arbiter(*arb_addr) as aportal: try: get_reg = partial(unpack_reg, aportal) @@ -350,7 +328,7 @@ async def close_chans_before_nursery( def test_close_channel_explicit( start_method, use_signal, - reg_addr, + arb_addr, ): """Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also @@ -360,7 +338,7 @@ def test_close_channel_explicit( trio.run( partial( close_chans_before_nursery, - reg_addr, + arb_addr, use_signal, remote_arbiter=False, ), @@ -372,7 +350,7 @@ def test_close_channel_explicit_remote_arbiter( daemon, start_method, use_signal, - reg_addr, + arb_addr, ): """Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also @@ -382,7 +360,7 @@ def test_close_channel_explicit_remote_arbiter( trio.run( partial( close_chans_before_nursery, - reg_addr, + arb_addr, use_signal, remote_arbiter=True, ), diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 568708a2..f9670225 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -47,7 +47,7 @@ async def trio_cancels_single_aio_task(): await tractor.to_asyncio.run_task(sleep_forever) -def test_trio_cancels_aio_on_actor_side(reg_addr): +def test_trio_cancels_aio_on_actor_side(arb_addr): ''' Spawn an infected actor that is cancelled by the ``trio`` side task using std cancel scope apis. @@ -55,7 +55,7 @@ def test_trio_cancels_aio_on_actor_side(reg_addr): ''' async def main(): async with tractor.open_nursery( - registry_addrs=[reg_addr] + arbiter_addr=arb_addr ) as n: await n.run_in_actor( trio_cancels_single_aio_task, @@ -94,7 +94,7 @@ async def asyncio_actor( raise -def test_aio_simple_error(reg_addr): +def test_aio_simple_error(arb_addr): ''' Verify a simple remote asyncio error propagates back through trio to the parent actor. @@ -103,7 +103,7 @@ def test_aio_simple_error(reg_addr): ''' async def main(): async with tractor.open_nursery( - registry_addrs=[reg_addr] + arbiter_addr=arb_addr ) as n: await n.run_in_actor( asyncio_actor, @@ -131,7 +131,7 @@ def test_aio_simple_error(reg_addr): assert err.type == AssertionError -def test_tractor_cancels_aio(reg_addr): +def test_tractor_cancels_aio(arb_addr): ''' Verify we can cancel a spawned asyncio task gracefully. @@ -150,7 +150,7 @@ def test_tractor_cancels_aio(reg_addr): trio.run(main) -def test_trio_cancels_aio(reg_addr): +def test_trio_cancels_aio(arb_addr): ''' Much like the above test with ``tractor.Portal.cancel_actor()`` except we just use a standard ``trio`` cancellation api. @@ -206,7 +206,7 @@ async def trio_ctx( ids='parent_actor_cancels_child={}'.format ) def test_context_spawns_aio_task_that_errors( - reg_addr, + arb_addr, parent_cancels: bool, ): ''' @@ -288,7 +288,7 @@ async def aio_cancel(): await sleep_forever() -def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): +def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): async def main(): async with tractor.open_nursery() as n: @@ -436,7 +436,7 @@ async def stream_from_aio( 'fan_out', [False, True], ids='fan_out_w_chan_subscribe={}'.format ) -def test_basic_interloop_channel_stream(reg_addr, fan_out): +def test_basic_interloop_channel_stream(arb_addr, fan_out): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -450,7 +450,7 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out): # TODO: parametrize the above test and avoid the duplication here? -def test_trio_error_cancels_intertask_chan(reg_addr): +def test_trio_error_cancels_intertask_chan(arb_addr): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -469,7 +469,7 @@ def test_trio_error_cancels_intertask_chan(reg_addr): assert exc.type == Exception -def test_trio_closes_early_and_channel_exits(reg_addr): +def test_trio_closes_early_and_channel_exits(arb_addr): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -484,7 +484,7 @@ def test_trio_closes_early_and_channel_exits(reg_addr): trio.run(main) -def test_aio_errors_and_channel_propagates_and_closes(reg_addr): +def test_aio_errors_and_channel_propagates_and_closes(arb_addr): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( @@ -561,7 +561,7 @@ async def trio_to_aio_echo_server( ids='raise_error={}'.format, ) def test_echoserver_detailed_mechanics( - reg_addr, + arb_addr, raise_error_mid_stream, ): diff --git a/tests/test_legacy_one_way_streaming.py b/tests/test_legacy_one_way_streaming.py index 1e7ec987..5d7787fa 100644 --- a/tests/test_legacy_one_way_streaming.py +++ b/tests/test_legacy_one_way_streaming.py @@ -55,7 +55,7 @@ async def context_stream( async def stream_from_single_subactor( - reg_addr, + arb_addr, start_method, stream_func, ): @@ -64,7 +64,7 @@ async def stream_from_single_subactor( # only one per host address, spawns an actor if None async with tractor.open_nursery( - registry_addrs=[reg_addr], + arbiter_addr=arb_addr, start_method=start_method, ) as nursery: @@ -115,13 +115,13 @@ async def stream_from_single_subactor( @pytest.mark.parametrize( 'stream_func', [async_gen_stream, context_stream] ) -def test_stream_from_single_subactor(reg_addr, start_method, stream_func): +def test_stream_from_single_subactor(arb_addr, start_method, stream_func): """Verify streaming from a spawned async generator. """ trio.run( partial( stream_from_single_subactor, - reg_addr, + arb_addr, start_method, stream_func=stream_func, ), @@ -225,14 +225,14 @@ async def a_quadruple_example(): return result_stream -async def cancel_after(wait, reg_addr): - async with tractor.open_root_actor(registry_addrs=[reg_addr]): +async def cancel_after(wait, arb_addr): + async with tractor.open_root_actor(arbiter_addr=arb_addr): with trio.move_on_after(wait): return await a_quadruple_example() @pytest.fixture(scope='module') -def time_quad_ex(reg_addr, ci_env, spawn_backend): +def time_quad_ex(arb_addr, ci_env, spawn_backend): if spawn_backend == 'mp': """no idea but the mp *nix runs are flaking out here often... """ @@ -240,7 +240,7 @@ def time_quad_ex(reg_addr, ci_env, spawn_backend): timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4 start = time.time() - results = trio.run(cancel_after, timeout, reg_addr) + results = trio.run(cancel_after, timeout, arb_addr) diff = time.time() - start assert results return results, diff @@ -260,14 +260,14 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend): list(map(lambda i: i/10, range(3, 9))) ) def test_not_fast_enough_quad( - reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend + arb_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend ): """Verify we can cancel midway through the quad example and all actors cancel gracefully. """ results, diff = time_quad_ex delay = max(diff - cancel_delay, 0) - results = trio.run(cancel_after, delay, reg_addr) + results = trio.run(cancel_after, delay, arb_addr) system = platform.system() if system in ('Windows', 'Darwin') and results is not None: # In CI envoirments it seems later runs are quicker then the first @@ -280,7 +280,7 @@ def test_not_fast_enough_quad( @tractor_test async def test_respawn_consumer_task( - reg_addr, + arb_addr, spawn_backend, loglevel, ): diff --git a/tests/test_local.py b/tests/test_local.py index a019d771..bb013043 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -24,7 +24,7 @@ async def test_no_runtime(): @tractor_test -async def test_self_is_registered(reg_addr): +async def test_self_is_registered(arb_addr): "Verify waiting on the arbiter to register itself using the standard api." actor = tractor.current_actor() assert actor.is_arbiter @@ -34,20 +34,20 @@ async def test_self_is_registered(reg_addr): @tractor_test -async def test_self_is_registered_localportal(reg_addr): +async def test_self_is_registered_localportal(arb_addr): "Verify waiting on the arbiter to register itself using a local portal." actor = tractor.current_actor() assert actor.is_arbiter - async with tractor.get_arbiter(*reg_addr) as portal: + async with tractor.get_arbiter(*arb_addr) as portal: assert isinstance(portal, tractor._portal.LocalPortal) with trio.fail_after(0.2): sockaddr = await portal.run_from_ns( 'self', 'wait_for_actor', name='root') - assert sockaddr[0] == reg_addr + assert sockaddr[0] == arb_addr -def test_local_actor_async_func(reg_addr): +def test_local_actor_async_func(arb_addr): """Verify a simple async function in-process. """ nums = [] @@ -55,7 +55,7 @@ def test_local_actor_async_func(reg_addr): async def print_loop(): async with tractor.open_root_actor( - registry_addrs=[reg_addr], + arbiter_addr=arb_addr, ): # arbiter is started in-proc if dne assert tractor.current_actor().is_arbiter diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index 92f4c52d..d3eadabf 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -30,9 +30,9 @@ def test_abort_on_sigint(daemon): @tractor_test -async def test_cancel_remote_arbiter(daemon, reg_addr): +async def test_cancel_remote_arbiter(daemon, arb_addr): assert not tractor.current_actor().is_arbiter - async with tractor.get_arbiter(*reg_addr) as portal: + async with tractor.get_arbiter(*arb_addr) as portal: await portal.cancel_actor() time.sleep(0.1) @@ -41,16 +41,16 @@ async def test_cancel_remote_arbiter(daemon, reg_addr): # no arbiter socket should exist with pytest.raises(OSError): - async with tractor.get_arbiter(*reg_addr) as portal: + async with tractor.get_arbiter(*arb_addr) as portal: pass -def test_register_duplicate_name(daemon, reg_addr): +def test_register_duplicate_name(daemon, arb_addr): async def main(): async with tractor.open_nursery( - registry_addrs=[reg_addr], + arbiter_addr=arb_addr, ) as n: assert not tractor.current_actor().is_arbiter diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 6d416f89..69f4c513 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -159,7 +159,7 @@ async def test_required_args(callwith_expecterror): ) def test_multi_actor_subs_arbiter_pub( loglevel, - reg_addr, + arb_addr, pub_actor, ): """Try out the neato @pub decorator system. @@ -169,7 +169,7 @@ def test_multi_actor_subs_arbiter_pub( async def main(): async with tractor.open_nursery( - registry_addrs=[reg_addr], + arbiter_addr=arb_addr, enable_modules=[__name__], ) as n: @@ -254,12 +254,12 @@ def test_multi_actor_subs_arbiter_pub( def test_single_subactor_pub_multitask_subs( loglevel, - reg_addr, + arb_addr, ): async def main(): async with tractor.open_nursery( - registry_addrs=[reg_addr], + arbiter_addr=arb_addr, enable_modules=[__name__], ) as n: diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 71f3258b..b16f2f1d 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -52,7 +52,7 @@ async def short_sleep(): 'fail_on_syntax', ], ) -def test_rpc_errors(reg_addr, to_call, testdir): +def test_rpc_errors(arb_addr, to_call, testdir): """Test errors when making various RPC requests to an actor that either doesn't have the requested module exposed or doesn't define the named function. @@ -84,7 +84,7 @@ def test_rpc_errors(reg_addr, to_call, testdir): # spawn a subactor which calls us back async with tractor.open_nursery( - arbiter_addr=reg_addr, + arbiter_addr=arb_addr, enable_modules=exposed_mods.copy(), ) as n: diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 6a4b2988..3f4772e9 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -16,14 +16,14 @@ data_to_pass_down = {'doggy': 10, 'kitty': 4} async def spawn( is_arbiter: bool, data: dict, - reg_addr: tuple[str, int], + arb_addr: tuple[str, int], ): namespaces = [__name__] await trio.sleep(0.1) async with tractor.open_root_actor( - arbiter_addr=reg_addr, + arbiter_addr=arb_addr, ): actor = tractor.current_actor() @@ -41,7 +41,7 @@ async def spawn( is_arbiter=False, name='sub-actor', data=data, - reg_addr=reg_addr, + arb_addr=arb_addr, enable_modules=namespaces, ) @@ -55,12 +55,12 @@ async def spawn( return 10 -def test_local_arbiter_subactor_global_state(reg_addr): +def test_local_arbiter_subactor_global_state(arb_addr): result = trio.run( spawn, True, data_to_pass_down, - reg_addr, + arb_addr, ) assert result == 10 @@ -140,7 +140,7 @@ async def check_loglevel(level): def test_loglevel_propagated_to_subactor( start_method, capfd, - reg_addr, + arb_addr, ): if start_method == 'mp_forkserver': pytest.skip( @@ -152,7 +152,7 @@ def test_loglevel_propagated_to_subactor( async with tractor.open_nursery( name='arbiter', start_method=start_method, - arbiter_addr=reg_addr, + arbiter_addr=arb_addr, ) as tn: await tn.run_in_actor( diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index d7a29134..5e18e10a 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -66,13 +66,13 @@ async def ensure_sequence( async def open_sequence_streamer( sequence: list[int], - reg_addr: tuple[str, int], + arb_addr: tuple[str, int], start_method: str, ) -> tractor.MsgStream: async with tractor.open_nursery( - arbiter_addr=reg_addr, + arbiter_addr=arb_addr, start_method=start_method, ) as tn: @@ -93,7 +93,7 @@ async def open_sequence_streamer( def test_stream_fan_out_to_local_subscriptions( - reg_addr, + arb_addr, start_method, ): @@ -103,7 +103,7 @@ def test_stream_fan_out_to_local_subscriptions( async with open_sequence_streamer( sequence, - reg_addr, + arb_addr, start_method, ) as stream: @@ -138,7 +138,7 @@ def test_stream_fan_out_to_local_subscriptions( ] ) def test_consumer_and_parent_maybe_lag( - reg_addr, + arb_addr, start_method, task_delays, ): @@ -150,7 +150,7 @@ def test_consumer_and_parent_maybe_lag( async with open_sequence_streamer( sequence, - reg_addr, + arb_addr, start_method, ) as stream: @@ -211,7 +211,7 @@ def test_consumer_and_parent_maybe_lag( def test_faster_task_to_recv_is_cancelled_by_slower( - reg_addr, + arb_addr, start_method, ): ''' @@ -225,7 +225,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower( async with open_sequence_streamer( sequence, - reg_addr, + arb_addr, start_method, ) as stream: @@ -302,7 +302,7 @@ def test_subscribe_errors_after_close(): def test_ensure_slow_consumers_lag_out( - reg_addr, + arb_addr, start_method, ): '''This is a pure local task test; no tractor