forked from goodboy/tractor
				
			Port all tests to new `reg_addr` fixture name
							parent
							
								
									fdf0c43bfa
								
							
						
					
					
						commit
						715348c5c2
					
				|  | @ -47,7 +47,7 @@ async def do_nuthin(): | |||
|     ], | ||||
|     ids=['no_args', 'unexpected_args'], | ||||
| ) | ||||
| def test_remote_error(arb_addr, args_err): | ||||
| def test_remote_error(reg_addr, args_err): | ||||
|     """Verify an error raised in a subactor that is propagated | ||||
|     to the parent nursery, contains the underlying boxed builtin | ||||
|     error type info and causes cancellation and reraising all the | ||||
|  | @ -57,7 +57,7 @@ def test_remote_error(arb_addr, args_err): | |||
| 
 | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery( | ||||
|             arbiter_addr=arb_addr, | ||||
|             registry_addrs=[reg_addr], | ||||
|         ) as nursery: | ||||
| 
 | ||||
|             # on a remote type error caused by bad input args | ||||
|  | @ -97,7 +97,7 @@ def test_remote_error(arb_addr, args_err): | |||
|             assert exc.type == errtype | ||||
| 
 | ||||
| 
 | ||||
| def test_multierror(arb_addr): | ||||
| def test_multierror(reg_addr): | ||||
|     ''' | ||||
|     Verify we raise a ``BaseExceptionGroup`` out of a nursery where | ||||
|     more then one actor errors. | ||||
|  | @ -105,7 +105,7 @@ def test_multierror(arb_addr): | |||
|     ''' | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery( | ||||
|             arbiter_addr=arb_addr, | ||||
|             registry_addrs=[reg_addr], | ||||
|         ) as nursery: | ||||
| 
 | ||||
|             await nursery.run_in_actor(assert_err, name='errorer1') | ||||
|  | @ -130,14 +130,14 @@ def test_multierror(arb_addr): | |||
| @pytest.mark.parametrize( | ||||
|     'num_subactors', range(25, 26), | ||||
| ) | ||||
| def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): | ||||
| def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay): | ||||
|     """Verify we raise a ``BaseExceptionGroup`` out of a nursery where | ||||
|     more then one actor errors and also with a delay before failure | ||||
|     to test failure during an ongoing spawning. | ||||
|     """ | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery( | ||||
|             arbiter_addr=arb_addr, | ||||
|             registry_addrs=[reg_addr], | ||||
|         ) as nursery: | ||||
| 
 | ||||
|             for i in range(num_subactors): | ||||
|  | @ -175,15 +175,20 @@ async def do_nothing(): | |||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize('mechanism', ['nursery_cancel', KeyboardInterrupt]) | ||||
| def test_cancel_single_subactor(arb_addr, mechanism): | ||||
|     """Ensure a ``ActorNursery.start_actor()`` spawned subactor | ||||
| def test_cancel_single_subactor(reg_addr, mechanism): | ||||
|     ''' | ||||
|     Ensure a ``ActorNursery.start_actor()`` spawned subactor | ||||
|     cancels when the nursery is cancelled. | ||||
|     """ | ||||
| 
 | ||||
|     ''' | ||||
|     async def spawn_actor(): | ||||
|         """Spawn an actor that blocks indefinitely. | ||||
|         """ | ||||
|         ''' | ||||
|         Spawn an actor that blocks indefinitely then cancel via | ||||
|         either `ActorNursery.cancel()` or an exception raise. | ||||
| 
 | ||||
|         ''' | ||||
|         async with tractor.open_nursery( | ||||
|             arbiter_addr=arb_addr, | ||||
|             registry_addrs=[reg_addr], | ||||
|         ) as nursery: | ||||
| 
 | ||||
|             portal = await nursery.start_actor( | ||||
|  |  | |||
|  | @ -141,7 +141,7 @@ async def open_actor_local_nursery( | |||
| ) | ||||
| def test_actor_managed_trio_nursery_task_error_cancels_aio( | ||||
|     asyncio_mode: bool, | ||||
|     arb_addr | ||||
|     reg_addr: tuple, | ||||
| ): | ||||
|     ''' | ||||
|     Verify that a ``trio`` nursery created managed in a child actor | ||||
|  |  | |||
|  | @ -5,7 +5,7 @@ Verify the we raise errors when streams are opened prior to | |||
| sync-opening a ``tractor.Context`` beforehand. | ||||
| 
 | ||||
| ''' | ||||
| from contextlib import asynccontextmanager as acm | ||||
| # from contextlib import asynccontextmanager as acm | ||||
| from itertools import count | ||||
| import platform | ||||
| from typing import Optional | ||||
|  |  | |||
|  | @ -78,7 +78,7 @@ has_nested_actors = pytest.mark.has_nested_actors | |||
| def spawn( | ||||
|     start_method, | ||||
|     testdir, | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
| ) -> 'pexpect.spawn': | ||||
| 
 | ||||
|     if start_method != 'trio': | ||||
|  |  | |||
|  | @ -15,19 +15,19 @@ from conftest import tractor_test | |||
| 
 | ||||
| 
 | ||||
| @tractor_test | ||||
| async def test_reg_then_unreg(arb_addr): | ||||
| async def test_reg_then_unreg(reg_addr): | ||||
|     actor = tractor.current_actor() | ||||
|     assert actor.is_arbiter | ||||
|     assert len(actor._registry) == 1  # only self is registered | ||||
| 
 | ||||
|     async with tractor.open_nursery( | ||||
|         arbiter_addr=arb_addr, | ||||
|         registry_addrs=[reg_addr], | ||||
|     ) as n: | ||||
| 
 | ||||
|         portal = await n.start_actor('actor', enable_modules=[__name__]) | ||||
|         uid = portal.channel.uid | ||||
| 
 | ||||
|         async with tractor.get_arbiter(*arb_addr) as aportal: | ||||
|         async with tractor.get_arbiter(*reg_addr) as aportal: | ||||
|             # this local actor should be the arbiter | ||||
|             assert actor is aportal.actor | ||||
| 
 | ||||
|  | @ -53,15 +53,27 @@ async def hi(): | |||
|     return the_line.format(tractor.current_actor().name) | ||||
| 
 | ||||
| 
 | ||||
| async def say_hello(other_actor): | ||||
| async def say_hello( | ||||
|     other_actor: str, | ||||
|     reg_addr: tuple[str, int], | ||||
| ): | ||||
|     await trio.sleep(1)  # wait for other actor to spawn | ||||
|     async with tractor.find_actor(other_actor) as portal: | ||||
|     async with tractor.find_actor( | ||||
|         other_actor, | ||||
|         registry_addrs=[reg_addr], | ||||
|     ) as portal: | ||||
|         assert portal is not None | ||||
|         return await portal.run(__name__, 'hi') | ||||
| 
 | ||||
| 
 | ||||
| async def say_hello_use_wait(other_actor): | ||||
|     async with tractor.wait_for_actor(other_actor) as portal: | ||||
| async def say_hello_use_wait( | ||||
|     other_actor: str, | ||||
|     reg_addr: tuple[str, int], | ||||
| ): | ||||
|     async with tractor.wait_for_actor( | ||||
|         other_actor, | ||||
|         registry_addr=reg_addr, | ||||
|     ) as portal: | ||||
|         assert portal is not None | ||||
|         result = await portal.run(__name__, 'hi') | ||||
|         return result | ||||
|  | @ -69,21 +81,29 @@ async def say_hello_use_wait(other_actor): | |||
| 
 | ||||
| @tractor_test | ||||
| @pytest.mark.parametrize('func', [say_hello, say_hello_use_wait]) | ||||
| async def test_trynamic_trio(func, start_method, arb_addr): | ||||
|     """Main tractor entry point, the "master" process (for now | ||||
|     acts as the "director"). | ||||
|     """ | ||||
| async def test_trynamic_trio( | ||||
|     func, | ||||
|     start_method, | ||||
|     reg_addr, | ||||
| ): | ||||
|     ''' | ||||
|     Root actor acting as the "director" and running one-shot-task-actors | ||||
|     for the directed subs. | ||||
| 
 | ||||
|     ''' | ||||
|     async with tractor.open_nursery() as n: | ||||
|         print("Alright... Action!") | ||||
| 
 | ||||
|         donny = await n.run_in_actor( | ||||
|             func, | ||||
|             other_actor='gretchen', | ||||
|             reg_addr=reg_addr, | ||||
|             name='donny', | ||||
|         ) | ||||
|         gretchen = await n.run_in_actor( | ||||
|             func, | ||||
|             other_actor='donny', | ||||
|             reg_addr=reg_addr, | ||||
|             name='gretchen', | ||||
|         ) | ||||
|         print(await gretchen.result()) | ||||
|  | @ -131,7 +151,7 @@ async def unpack_reg(actor_or_portal): | |||
| 
 | ||||
| 
 | ||||
| async def spawn_and_check_registry( | ||||
|     arb_addr: tuple, | ||||
|     reg_addr: tuple, | ||||
|     use_signal: bool, | ||||
|     remote_arbiter: bool = False, | ||||
|     with_streaming: bool = False, | ||||
|  | @ -139,9 +159,9 @@ async def spawn_and_check_registry( | |||
| ) -> None: | ||||
| 
 | ||||
|     async with tractor.open_root_actor( | ||||
|         arbiter_addr=arb_addr, | ||||
|         registry_addrs=[reg_addr], | ||||
|     ): | ||||
|         async with tractor.get_arbiter(*arb_addr) as portal: | ||||
|         async with tractor.get_arbiter(*reg_addr) as portal: | ||||
|             # runtime needs to be up to call this | ||||
|             actor = tractor.current_actor() | ||||
| 
 | ||||
|  | @ -213,17 +233,19 @@ async def spawn_and_check_registry( | |||
| def test_subactors_unregister_on_cancel( | ||||
|     start_method, | ||||
|     use_signal, | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
|     with_streaming, | ||||
| ): | ||||
|     """Verify that cancelling a nursery results in all subactors | ||||
|     ''' | ||||
|     Verify that cancelling a nursery results in all subactors | ||||
|     deregistering themselves with the arbiter. | ||||
|     """ | ||||
| 
 | ||||
|     ''' | ||||
|     with pytest.raises(KeyboardInterrupt): | ||||
|         trio.run( | ||||
|             partial( | ||||
|                 spawn_and_check_registry, | ||||
|                 arb_addr, | ||||
|                 reg_addr, | ||||
|                 use_signal, | ||||
|                 remote_arbiter=False, | ||||
|                 with_streaming=with_streaming, | ||||
|  | @ -237,7 +259,7 @@ def test_subactors_unregister_on_cancel_remote_daemon( | |||
|     daemon, | ||||
|     start_method, | ||||
|     use_signal, | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
|     with_streaming, | ||||
| ): | ||||
|     """Verify that cancelling a nursery results in all subactors | ||||
|  | @ -248,7 +270,7 @@ def test_subactors_unregister_on_cancel_remote_daemon( | |||
|         trio.run( | ||||
|             partial( | ||||
|                 spawn_and_check_registry, | ||||
|                 arb_addr, | ||||
|                 reg_addr, | ||||
|                 use_signal, | ||||
|                 remote_arbiter=True, | ||||
|                 with_streaming=with_streaming, | ||||
|  | @ -262,7 +284,7 @@ async def streamer(agen): | |||
| 
 | ||||
| 
 | ||||
| async def close_chans_before_nursery( | ||||
|     arb_addr: tuple, | ||||
|     reg_addr: tuple, | ||||
|     use_signal: bool, | ||||
|     remote_arbiter: bool = False, | ||||
| ) -> None: | ||||
|  | @ -275,9 +297,9 @@ async def close_chans_before_nursery( | |||
|         entries_at_end = 1 | ||||
| 
 | ||||
|     async with tractor.open_root_actor( | ||||
|         arbiter_addr=arb_addr, | ||||
|         registry_addrs=[reg_addr], | ||||
|     ): | ||||
|         async with tractor.get_arbiter(*arb_addr) as aportal: | ||||
|         async with tractor.get_arbiter(*reg_addr) as aportal: | ||||
|             try: | ||||
|                 get_reg = partial(unpack_reg, aportal) | ||||
| 
 | ||||
|  | @ -329,7 +351,7 @@ async def close_chans_before_nursery( | |||
| def test_close_channel_explicit( | ||||
|     start_method, | ||||
|     use_signal, | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
| ): | ||||
|     """Verify that closing a stream explicitly and killing the actor's | ||||
|     "root nursery" **before** the containing nursery tears down also | ||||
|  | @ -339,7 +361,7 @@ def test_close_channel_explicit( | |||
|         trio.run( | ||||
|             partial( | ||||
|                 close_chans_before_nursery, | ||||
|                 arb_addr, | ||||
|                 reg_addr, | ||||
|                 use_signal, | ||||
|                 remote_arbiter=False, | ||||
|             ), | ||||
|  | @ -351,7 +373,7 @@ def test_close_channel_explicit_remote_arbiter( | |||
|     daemon, | ||||
|     start_method, | ||||
|     use_signal, | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
| ): | ||||
|     """Verify that closing a stream explicitly and killing the actor's | ||||
|     "root nursery" **before** the containing nursery tears down also | ||||
|  | @ -361,7 +383,7 @@ def test_close_channel_explicit_remote_arbiter( | |||
|         trio.run( | ||||
|             partial( | ||||
|                 close_chans_before_nursery, | ||||
|                 arb_addr, | ||||
|                 reg_addr, | ||||
|                 use_signal, | ||||
|                 remote_arbiter=True, | ||||
|             ), | ||||
|  |  | |||
|  | @ -47,7 +47,7 @@ async def trio_cancels_single_aio_task(): | |||
|         await tractor.to_asyncio.run_task(sleep_forever) | ||||
| 
 | ||||
| 
 | ||||
| def test_trio_cancels_aio_on_actor_side(arb_addr): | ||||
| def test_trio_cancels_aio_on_actor_side(reg_addr): | ||||
|     ''' | ||||
|     Spawn an infected actor that is cancelled by the ``trio`` side | ||||
|     task using std cancel scope apis. | ||||
|  | @ -55,7 +55,7 @@ def test_trio_cancels_aio_on_actor_side(arb_addr): | |||
|     ''' | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery( | ||||
|             arbiter_addr=arb_addr | ||||
|             registry_addrs=[reg_addr] | ||||
|         ) as n: | ||||
|             await n.run_in_actor( | ||||
|                 trio_cancels_single_aio_task, | ||||
|  | @ -94,7 +94,7 @@ async def asyncio_actor( | |||
|         raise | ||||
| 
 | ||||
| 
 | ||||
| def test_aio_simple_error(arb_addr): | ||||
| def test_aio_simple_error(reg_addr): | ||||
|     ''' | ||||
|     Verify a simple remote asyncio error propagates back through trio | ||||
|     to the parent actor. | ||||
|  | @ -103,7 +103,7 @@ def test_aio_simple_error(arb_addr): | |||
|     ''' | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery( | ||||
|             arbiter_addr=arb_addr | ||||
|             registry_addrs=[reg_addr] | ||||
|         ) as n: | ||||
|             await n.run_in_actor( | ||||
|                 asyncio_actor, | ||||
|  | @ -120,7 +120,7 @@ def test_aio_simple_error(arb_addr): | |||
|     assert err.type == AssertionError | ||||
| 
 | ||||
| 
 | ||||
| def test_tractor_cancels_aio(arb_addr): | ||||
| def test_tractor_cancels_aio(reg_addr): | ||||
|     ''' | ||||
|     Verify we can cancel a spawned asyncio task gracefully. | ||||
| 
 | ||||
|  | @ -139,7 +139,7 @@ def test_tractor_cancels_aio(arb_addr): | |||
|     trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| def test_trio_cancels_aio(arb_addr): | ||||
| def test_trio_cancels_aio(reg_addr): | ||||
|     ''' | ||||
|     Much like the above test with ``tractor.Portal.cancel_actor()`` | ||||
|     except we just use a standard ``trio`` cancellation api. | ||||
|  | @ -194,7 +194,7 @@ async def trio_ctx( | |||
|     ids='parent_actor_cancels_child={}'.format | ||||
| ) | ||||
| def test_context_spawns_aio_task_that_errors( | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
|     parent_cancels: bool, | ||||
| ): | ||||
|     ''' | ||||
|  | @ -258,7 +258,7 @@ async def aio_cancel(): | |||
|     await sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
| def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): | ||||
| def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): | ||||
| 
 | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery() as n: | ||||
|  | @ -395,7 +395,7 @@ async def stream_from_aio( | |||
|     'fan_out', [False, True], | ||||
|     ids='fan_out_w_chan_subscribe={}'.format | ||||
| ) | ||||
| def test_basic_interloop_channel_stream(arb_addr, fan_out): | ||||
| def test_basic_interloop_channel_stream(reg_addr, fan_out): | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery() as n: | ||||
|             portal = await n.run_in_actor( | ||||
|  | @ -409,7 +409,7 @@ def test_basic_interloop_channel_stream(arb_addr, fan_out): | |||
| 
 | ||||
| 
 | ||||
| # TODO: parametrize the above test and avoid the duplication here? | ||||
| def test_trio_error_cancels_intertask_chan(arb_addr): | ||||
| def test_trio_error_cancels_intertask_chan(reg_addr): | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery() as n: | ||||
|             portal = await n.run_in_actor( | ||||
|  | @ -428,7 +428,7 @@ def test_trio_error_cancels_intertask_chan(arb_addr): | |||
|         assert exc.type == Exception | ||||
| 
 | ||||
| 
 | ||||
| def test_trio_closes_early_and_channel_exits(arb_addr): | ||||
| def test_trio_closes_early_and_channel_exits(reg_addr): | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery() as n: | ||||
|             portal = await n.run_in_actor( | ||||
|  | @ -443,7 +443,7 @@ def test_trio_closes_early_and_channel_exits(arb_addr): | |||
|     trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| def test_aio_errors_and_channel_propagates_and_closes(arb_addr): | ||||
| def test_aio_errors_and_channel_propagates_and_closes(reg_addr): | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery() as n: | ||||
|             portal = await n.run_in_actor( | ||||
|  | @ -520,7 +520,7 @@ async def trio_to_aio_echo_server( | |||
|     ids='raise_error={}'.format, | ||||
| ) | ||||
| def test_echoserver_detailed_mechanics( | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
|     raise_error_mid_stream, | ||||
| ): | ||||
| 
 | ||||
|  |  | |||
|  | @ -55,7 +55,7 @@ async def context_stream( | |||
| 
 | ||||
| 
 | ||||
| async def stream_from_single_subactor( | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
|     start_method, | ||||
|     stream_func, | ||||
| ): | ||||
|  | @ -64,7 +64,7 @@ async def stream_from_single_subactor( | |||
|     # only one per host address, spawns an actor if None | ||||
| 
 | ||||
|     async with tractor.open_nursery( | ||||
|         arbiter_addr=arb_addr, | ||||
|         registry_addrs=[reg_addr], | ||||
|         start_method=start_method, | ||||
|     ) as nursery: | ||||
| 
 | ||||
|  | @ -115,13 +115,13 @@ async def stream_from_single_subactor( | |||
| @pytest.mark.parametrize( | ||||
|     'stream_func', [async_gen_stream, context_stream] | ||||
| ) | ||||
| def test_stream_from_single_subactor(arb_addr, start_method, stream_func): | ||||
| def test_stream_from_single_subactor(reg_addr, start_method, stream_func): | ||||
|     """Verify streaming from a spawned async generator. | ||||
|     """ | ||||
|     trio.run( | ||||
|         partial( | ||||
|             stream_from_single_subactor, | ||||
|             arb_addr, | ||||
|             reg_addr, | ||||
|             start_method, | ||||
|             stream_func=stream_func, | ||||
|         ), | ||||
|  | @ -225,14 +225,14 @@ async def a_quadruple_example(): | |||
|         return result_stream | ||||
| 
 | ||||
| 
 | ||||
| async def cancel_after(wait, arb_addr): | ||||
|     async with tractor.open_root_actor(arbiter_addr=arb_addr): | ||||
| async def cancel_after(wait, reg_addr): | ||||
|     async with tractor.open_root_actor(registry_addrs=[reg_addr]): | ||||
|         with trio.move_on_after(wait): | ||||
|             return await a_quadruple_example() | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture(scope='module') | ||||
| def time_quad_ex(arb_addr, ci_env, spawn_backend): | ||||
| def time_quad_ex(reg_addr, ci_env, spawn_backend): | ||||
|     if spawn_backend == 'mp': | ||||
|         """no idea but the  mp *nix runs are flaking out here often... | ||||
|         """ | ||||
|  | @ -240,7 +240,7 @@ def time_quad_ex(arb_addr, ci_env, spawn_backend): | |||
| 
 | ||||
|     timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4 | ||||
|     start = time.time() | ||||
|     results = trio.run(cancel_after, timeout, arb_addr) | ||||
|     results = trio.run(cancel_after, timeout, reg_addr) | ||||
|     diff = time.time() - start | ||||
|     assert results | ||||
|     return results, diff | ||||
|  | @ -260,14 +260,14 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend): | |||
|     list(map(lambda i: i/10, range(3, 9))) | ||||
| ) | ||||
| def test_not_fast_enough_quad( | ||||
|     arb_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend | ||||
|     reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend | ||||
| ): | ||||
|     """Verify we can cancel midway through the quad example and all actors | ||||
|     cancel gracefully. | ||||
|     """ | ||||
|     results, diff = time_quad_ex | ||||
|     delay = max(diff - cancel_delay, 0) | ||||
|     results = trio.run(cancel_after, delay, arb_addr) | ||||
|     results = trio.run(cancel_after, delay, reg_addr) | ||||
|     system = platform.system() | ||||
|     if system in ('Windows', 'Darwin') and results is not None: | ||||
|         # In CI envoirments it seems later runs are quicker then the first | ||||
|  | @ -280,7 +280,7 @@ def test_not_fast_enough_quad( | |||
| 
 | ||||
| @tractor_test | ||||
| async def test_respawn_consumer_task( | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
|     spawn_backend, | ||||
|     loglevel, | ||||
| ): | ||||
|  |  | |||
|  | @ -24,7 +24,7 @@ async def test_no_runtime(): | |||
| 
 | ||||
| 
 | ||||
| @tractor_test | ||||
| async def test_self_is_registered(arb_addr): | ||||
| async def test_self_is_registered(reg_addr): | ||||
|     "Verify waiting on the arbiter to register itself using the standard api." | ||||
|     actor = tractor.current_actor() | ||||
|     assert actor.is_arbiter | ||||
|  | @ -34,20 +34,20 @@ async def test_self_is_registered(arb_addr): | |||
| 
 | ||||
| 
 | ||||
| @tractor_test | ||||
| async def test_self_is_registered_localportal(arb_addr): | ||||
| async def test_self_is_registered_localportal(reg_addr): | ||||
|     "Verify waiting on the arbiter to register itself using a local portal." | ||||
|     actor = tractor.current_actor() | ||||
|     assert actor.is_arbiter | ||||
|     async with tractor.get_arbiter(*arb_addr) as portal: | ||||
|     async with tractor.get_arbiter(*reg_addr) as portal: | ||||
|         assert isinstance(portal, tractor._portal.LocalPortal) | ||||
| 
 | ||||
|         with trio.fail_after(0.2): | ||||
|             sockaddr = await portal.run_from_ns( | ||||
|                     'self', 'wait_for_actor', name='root') | ||||
|             assert sockaddr[0] == arb_addr | ||||
|             assert sockaddr[0] == reg_addr | ||||
| 
 | ||||
| 
 | ||||
| def test_local_actor_async_func(arb_addr): | ||||
| def test_local_actor_async_func(reg_addr): | ||||
|     """Verify a simple async function in-process. | ||||
|     """ | ||||
|     nums = [] | ||||
|  | @ -55,7 +55,7 @@ def test_local_actor_async_func(arb_addr): | |||
|     async def print_loop(): | ||||
| 
 | ||||
|         async with tractor.open_root_actor( | ||||
|             arbiter_addr=arb_addr, | ||||
|             registry_addrs=[reg_addr], | ||||
|         ): | ||||
|             # arbiter is started in-proc if dne | ||||
|             assert tractor.current_actor().is_arbiter | ||||
|  |  | |||
|  | @ -28,9 +28,9 @@ def test_abort_on_sigint(daemon): | |||
| 
 | ||||
| 
 | ||||
| @tractor_test | ||||
| async def test_cancel_remote_arbiter(daemon, arb_addr): | ||||
| async def test_cancel_remote_arbiter(daemon, reg_addr): | ||||
|     assert not tractor.current_actor().is_arbiter | ||||
|     async with tractor.get_arbiter(*arb_addr) as portal: | ||||
|     async with tractor.get_arbiter(*reg_addr) as portal: | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
|     time.sleep(0.1) | ||||
|  | @ -39,16 +39,16 @@ async def test_cancel_remote_arbiter(daemon, arb_addr): | |||
| 
 | ||||
|     # no arbiter socket should exist | ||||
|     with pytest.raises(OSError): | ||||
|         async with tractor.get_arbiter(*arb_addr) as portal: | ||||
|         async with tractor.get_arbiter(*reg_addr) as portal: | ||||
|             pass | ||||
| 
 | ||||
| 
 | ||||
| def test_register_duplicate_name(daemon, arb_addr): | ||||
| def test_register_duplicate_name(daemon, reg_addr): | ||||
| 
 | ||||
|     async def main(): | ||||
| 
 | ||||
|         async with tractor.open_nursery( | ||||
|             arbiter_addr=arb_addr, | ||||
|             registry_addrs=[reg_addr], | ||||
|         ) as n: | ||||
| 
 | ||||
|             assert not tractor.current_actor().is_arbiter | ||||
|  |  | |||
|  | @ -160,7 +160,7 @@ async def test_required_args(callwith_expecterror): | |||
| ) | ||||
| def test_multi_actor_subs_arbiter_pub( | ||||
|     loglevel, | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
|     pub_actor, | ||||
| ): | ||||
|     """Try out the neato @pub decorator system. | ||||
|  | @ -170,7 +170,7 @@ def test_multi_actor_subs_arbiter_pub( | |||
|     async def main(): | ||||
| 
 | ||||
|         async with tractor.open_nursery( | ||||
|             arbiter_addr=arb_addr, | ||||
|             registry_addrs=[reg_addr], | ||||
|             enable_modules=[__name__], | ||||
|         ) as n: | ||||
| 
 | ||||
|  | @ -255,12 +255,12 @@ def test_multi_actor_subs_arbiter_pub( | |||
| 
 | ||||
| def test_single_subactor_pub_multitask_subs( | ||||
|     loglevel, | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
| ): | ||||
|     async def main(): | ||||
| 
 | ||||
|         async with tractor.open_nursery( | ||||
|             arbiter_addr=arb_addr, | ||||
|             registry_addrs=[reg_addr], | ||||
|             enable_modules=[__name__], | ||||
|         ) as n: | ||||
| 
 | ||||
|  |  | |||
|  | @ -45,7 +45,7 @@ async def short_sleep(): | |||
|     ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import', | ||||
|          'fail_on_syntax'], | ||||
| ) | ||||
| def test_rpc_errors(arb_addr, to_call, testdir): | ||||
| def test_rpc_errors(reg_addr, to_call, testdir): | ||||
|     """Test errors when making various RPC requests to an actor | ||||
|     that either doesn't have the requested module exposed or doesn't define | ||||
|     the named function. | ||||
|  | @ -77,7 +77,7 @@ def test_rpc_errors(arb_addr, to_call, testdir): | |||
| 
 | ||||
|         # spawn a subactor which calls us back | ||||
|         async with tractor.open_nursery( | ||||
|             arbiter_addr=arb_addr, | ||||
|             arbiter_addr=reg_addr, | ||||
|             enable_modules=exposed_mods.copy(), | ||||
|         ) as n: | ||||
| 
 | ||||
|  |  | |||
|  | @ -16,14 +16,14 @@ data_to_pass_down = {'doggy': 10, 'kitty': 4} | |||
| async def spawn( | ||||
|     is_arbiter: bool, | ||||
|     data: dict, | ||||
|     arb_addr: tuple[str, int], | ||||
|     reg_addr: tuple[str, int], | ||||
| ): | ||||
|     namespaces = [__name__] | ||||
| 
 | ||||
|     await trio.sleep(0.1) | ||||
| 
 | ||||
|     async with tractor.open_root_actor( | ||||
|         arbiter_addr=arb_addr, | ||||
|         arbiter_addr=reg_addr, | ||||
|     ): | ||||
| 
 | ||||
|         actor = tractor.current_actor() | ||||
|  | @ -41,7 +41,7 @@ async def spawn( | |||
|                     is_arbiter=False, | ||||
|                     name='sub-actor', | ||||
|                     data=data, | ||||
|                     arb_addr=arb_addr, | ||||
|                     reg_addr=reg_addr, | ||||
|                     enable_modules=namespaces, | ||||
|                 ) | ||||
| 
 | ||||
|  | @ -55,12 +55,12 @@ async def spawn( | |||
|             return 10 | ||||
| 
 | ||||
| 
 | ||||
| def test_local_arbiter_subactor_global_state(arb_addr): | ||||
| def test_local_arbiter_subactor_global_state(reg_addr): | ||||
|     result = trio.run( | ||||
|         spawn, | ||||
|         True, | ||||
|         data_to_pass_down, | ||||
|         arb_addr, | ||||
|         reg_addr, | ||||
|     ) | ||||
|     assert result == 10 | ||||
| 
 | ||||
|  | @ -140,7 +140,7 @@ async def check_loglevel(level): | |||
| def test_loglevel_propagated_to_subactor( | ||||
|     start_method, | ||||
|     capfd, | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
| ): | ||||
|     if start_method == 'mp_forkserver': | ||||
|         pytest.skip( | ||||
|  | @ -152,7 +152,7 @@ def test_loglevel_propagated_to_subactor( | |||
|         async with tractor.open_nursery( | ||||
|             name='arbiter', | ||||
|             start_method=start_method, | ||||
|             arbiter_addr=arb_addr, | ||||
|             arbiter_addr=reg_addr, | ||||
| 
 | ||||
|         ) as tn: | ||||
|             await tn.run_in_actor( | ||||
|  |  | |||
|  | @ -66,13 +66,13 @@ async def ensure_sequence( | |||
| async def open_sequence_streamer( | ||||
| 
 | ||||
|     sequence: list[int], | ||||
|     arb_addr: tuple[str, int], | ||||
|     reg_addr: tuple[str, int], | ||||
|     start_method: str, | ||||
| 
 | ||||
| ) -> tractor.MsgStream: | ||||
| 
 | ||||
|     async with tractor.open_nursery( | ||||
|         arbiter_addr=arb_addr, | ||||
|         arbiter_addr=reg_addr, | ||||
|         start_method=start_method, | ||||
|     ) as tn: | ||||
| 
 | ||||
|  | @ -93,7 +93,7 @@ async def open_sequence_streamer( | |||
| 
 | ||||
| 
 | ||||
| def test_stream_fan_out_to_local_subscriptions( | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
|     start_method, | ||||
| ): | ||||
| 
 | ||||
|  | @ -103,7 +103,7 @@ def test_stream_fan_out_to_local_subscriptions( | |||
| 
 | ||||
|         async with open_sequence_streamer( | ||||
|             sequence, | ||||
|             arb_addr, | ||||
|             reg_addr, | ||||
|             start_method, | ||||
|         ) as stream: | ||||
| 
 | ||||
|  | @ -138,7 +138,7 @@ def test_stream_fan_out_to_local_subscriptions( | |||
|     ] | ||||
| ) | ||||
| def test_consumer_and_parent_maybe_lag( | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
|     start_method, | ||||
|     task_delays, | ||||
| ): | ||||
|  | @ -150,7 +150,7 @@ def test_consumer_and_parent_maybe_lag( | |||
| 
 | ||||
|         async with open_sequence_streamer( | ||||
|             sequence, | ||||
|             arb_addr, | ||||
|             reg_addr, | ||||
|             start_method, | ||||
|         ) as stream: | ||||
| 
 | ||||
|  | @ -211,7 +211,7 @@ def test_consumer_and_parent_maybe_lag( | |||
| 
 | ||||
| 
 | ||||
| def test_faster_task_to_recv_is_cancelled_by_slower( | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
|     start_method, | ||||
| ): | ||||
|     ''' | ||||
|  | @ -225,7 +225,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower( | |||
| 
 | ||||
|         async with open_sequence_streamer( | ||||
|             sequence, | ||||
|             arb_addr, | ||||
|             reg_addr, | ||||
|             start_method, | ||||
| 
 | ||||
|         ) as stream: | ||||
|  | @ -302,7 +302,7 @@ def test_subscribe_errors_after_close(): | |||
| 
 | ||||
| 
 | ||||
| def test_ensure_slow_consumers_lag_out( | ||||
|     arb_addr, | ||||
|     reg_addr, | ||||
|     start_method, | ||||
| ): | ||||
|     '''This is a pure local task test; no tractor | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue