diff --git a/docs/index.rst b/docs/index.rst index 20b03e5..4ed5a47 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -145,7 +145,7 @@ and use the ``run_in_actor()`` method: What's going on? -- an initial *actor* is started with ``tractor.run()`` and told to execute +- an initial *actor* is started with ``trio.run()`` and told to execute its main task_: ``main()`` - inside ``main()`` an actor is *spawned* using an ``ActorNusery`` and is told @@ -182,7 +182,7 @@ Here is a similar example using the latter method: .. literalinclude:: ../examples/actor_spawning_and_causality_with_daemon.py -The ``rpc_module_paths`` `kwarg` above is a list of module path +The ``enable_modules`` `kwarg` above is a list of module path strings that will be loaded and made accessible for execution in the remote actor through a call to ``Portal.run()``. For now this is a simple mechanism to restrict the functionality of the remote @@ -458,7 +458,7 @@ find an actor's socket address by name use the ``find_actor()`` function: .. literalinclude:: ../examples/service_discovery.py The ``name`` value you should pass to ``find_actor()`` is the one you passed as the -*first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``. +*first* argument to either ``trio.run()`` or ``ActorNursery.start_actor()``. Running actors standalone @@ -472,7 +472,17 @@ need to hop into a debugger. You just need to pass the existing .. code:: python - tractor.run(main, arbiter_addr=('192.168.0.10', 1616)) + import trio + import tractor + + async def main(): + + async with tractor.open_root_actor( + arbiter_addr=('192.168.0.10', 1616) + ): + await trio.sleep_forever() + + trio.run(main) Choosing a process spawning backend @@ -480,7 +490,7 @@ Choosing a process spawning backend ``tractor`` is architected to support multiple actor (sub-process) spawning backends. Specific defaults are chosen based on your system but you can also explicitly select a backend of choice at startup -via a ``start_method`` kwarg to ``tractor.run()``. +via a ``start_method`` kwarg to ``tractor.open_nursery()``. Currently the options available are: @@ -536,13 +546,14 @@ main python module of the program: .. code:: python # application/__main__.py + import trio import tractor import multiprocessing from . import tractor_app if __name__ == '__main__': multiprocessing.freeze_support() - tractor.run(tractor_app.main) + trio.run(tractor_app.main) And execute as:: diff --git a/examples/__main__.py b/examples/__main__.py index c3f7add..66a1bf8 100644 --- a/examples/__main__.py +++ b/examples/__main__.py @@ -16,4 +16,4 @@ if __name__ == '__main__': # temporary dir and name it test_example.py. We import that script # module here and invoke it's ``main()``. from . import test_example - test_example.tractor.run(test_example.main, start_method='spawn') + test_example.trio.run(test_example.main) diff --git a/examples/a_trynamic_first_scene.py b/examples/a_trynamic_first_scene.py index dca59c2..1d53125 100644 --- a/examples/a_trynamic_first_scene.py +++ b/examples/a_trynamic_first_scene.py @@ -1,3 +1,4 @@ +import trio import tractor _this_module = __name__ @@ -40,4 +41,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main) + trio.run(main) diff --git a/examples/actor_spawning_and_causality.py b/examples/actor_spawning_and_causality.py index ca793f1..119726d 100644 --- a/examples/actor_spawning_and_causality.py +++ b/examples/actor_spawning_and_causality.py @@ -1,3 +1,4 @@ +import trio import tractor @@ -23,4 +24,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main) + trio.run(main) diff --git a/examples/actor_spawning_and_causality_with_daemon.py b/examples/actor_spawning_and_causality_with_daemon.py index 1ab0f88..b052871 100644 --- a/examples/actor_spawning_and_causality_with_daemon.py +++ b/examples/actor_spawning_and_causality_with_daemon.py @@ -1,3 +1,4 @@ +import trio import tractor @@ -16,7 +17,7 @@ async def main(): portal = await n.start_actor( 'frank', # enable the actor to run funcs from this current module - rpc_module_paths=[__name__], + enable_modules=[__name__], ) print(await portal.run(movie_theatre_question)) @@ -30,4 +31,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main) + trio.run(main) diff --git a/examples/asynchronous_generators.py b/examples/asynchronous_generators.py index 47ee136..99b0a13 100644 --- a/examples/asynchronous_generators.py +++ b/examples/asynchronous_generators.py @@ -14,12 +14,15 @@ async def stream_forever(): async def main(): + # stream for at most 1 seconds with trio.move_on_after(1) as cancel_scope: + async with tractor.open_nursery() as n: + portal = await n.start_actor( - f'donny', - rpc_module_paths=[__name__], + 'donny', + enable_modules=[__name__], ) # this async for loop streams values from the above @@ -34,4 +37,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main) + trio.run(main) diff --git a/examples/debugging/multi_daemon_subactors.py b/examples/debugging/multi_daemon_subactors.py index c37b879..c506de4 100644 --- a/examples/debugging/multi_daemon_subactors.py +++ b/examples/debugging/multi_daemon_subactors.py @@ -11,7 +11,7 @@ async def breakpoint_forever(): async def name_error(): "Raise a ``NameError``" - getattr(doggypants) + getattr(doggypants) # noqa async def main(): diff --git a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py index e754599..f9adb25 100644 --- a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py +++ b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py @@ -4,7 +4,7 @@ import tractor async def name_error(): "Raise a ``NameError``" - getattr(doggypants) + getattr(doggypants) # noqa async def breakpoint_forever(): diff --git a/examples/debugging/multi_subactor_root_errors.py b/examples/debugging/multi_subactor_root_errors.py index 8a7fa43..6c69618 100644 --- a/examples/debugging/multi_subactor_root_errors.py +++ b/examples/debugging/multi_subactor_root_errors.py @@ -1,9 +1,10 @@ +import trio import tractor async def name_error(): "Raise a ``NameError``" - getattr(doggypants) + getattr(doggypants) # noqa async def spawn_error(): @@ -32,7 +33,9 @@ async def main(): - root actor should then fail on assert - program termination """ - async with tractor.open_nursery() as n: + async with tractor.open_nursery( + debug_mode=True, + ) as n: # spawn both actors portal = await n.run_in_actor( @@ -54,4 +57,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main, debug_mode=True) + trio.run(main) diff --git a/examples/debugging/multi_subactors.py b/examples/debugging/multi_subactors.py index 0d5ee83..259d526 100644 --- a/examples/debugging/multi_subactors.py +++ b/examples/debugging/multi_subactors.py @@ -11,7 +11,7 @@ async def breakpoint_forever(): async def name_error(): "Raise a ``NameError``" - getattr(doggypants) + getattr(doggypants) # noqa async def spawn_error(): @@ -36,7 +36,9 @@ async def main(): `-python -m tractor._child --uid ('spawn_error', '52ee14a5 ...) `-python -m tractor._child --uid ('name_error', '3391222c ...) """ - async with tractor.open_nursery() as n: + async with tractor.open_nursery( + debug_mode=True, + ) as n: # Spawn both actors, don't bother with collecting results # (would result in a different debugger outcome due to parent's @@ -47,4 +49,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main, debug_mode=True) + trio.run(main) diff --git a/examples/debugging/root_actor_breakpoint.py b/examples/debugging/root_actor_breakpoint.py index bd4dcb1..5c858d4 100644 --- a/examples/debugging/root_actor_breakpoint.py +++ b/examples/debugging/root_actor_breakpoint.py @@ -4,12 +4,16 @@ import tractor async def main(): - await trio.sleep(0.1) + async with tractor.open_root_actor( + debug_mode=True, + ): - await tractor.breakpoint() + await trio.sleep(0.1) - await trio.sleep(0.1) + await tractor.breakpoint() + + await trio.sleep(0.1) if __name__ == '__main__': - tractor.run(main, debug_mode=True) + trio.run(main) diff --git a/examples/debugging/root_actor_breakpoint_forever.py b/examples/debugging/root_actor_breakpoint_forever.py index 0332ab6..3536a75 100644 --- a/examples/debugging/root_actor_breakpoint_forever.py +++ b/examples/debugging/root_actor_breakpoint_forever.py @@ -1,11 +1,15 @@ +import trio import tractor async def main(): - while True: - await tractor.breakpoint() + async with tractor.open_root_actor( + debug_mode=True, + ): + while True: + await tractor.breakpoint() if __name__ == '__main__': - tractor.run(main, debug_mode=True) + trio.run(main) diff --git a/examples/debugging/root_actor_error.py b/examples/debugging/root_actor_error.py index 7486699..fab4633 100644 --- a/examples/debugging/root_actor_error.py +++ b/examples/debugging/root_actor_error.py @@ -1,9 +1,13 @@ +import trio import tractor async def main(): - assert 0 + async with tractor.open_root_actor( + debug_mode=True, + ): + assert 0 if __name__ == '__main__': - tractor.run(main, debug_mode=True) + trio.run(main) diff --git a/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py b/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py index d0a1649..16f92b8 100644 --- a/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py +++ b/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py @@ -1,9 +1,10 @@ +import trio import tractor async def name_error(): "Raise a ``NameError``" - getattr(doggypants) + getattr(doggypants) # noqa async def spawn_until(depth=0): @@ -37,7 +38,10 @@ async def main(): └─ python -m tractor._child --uid ('name_error', '6c2733b8 ...) """ - async with tractor.open_nursery() as n: + async with tractor.open_nursery( + debug_mode=True, + loglevel='warning' + ) as n: # spawn both actors portal = await n.run_in_actor( @@ -58,4 +62,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main, debug_mode=True, loglevel='warning') + trio.run(main) diff --git a/examples/debugging/subactor_breakpoint.py b/examples/debugging/subactor_breakpoint.py index d880404..bcc304d 100644 --- a/examples/debugging/subactor_breakpoint.py +++ b/examples/debugging/subactor_breakpoint.py @@ -12,7 +12,9 @@ async def breakpoint_forever(): async def main(): - async with tractor.open_nursery() as n: + async with tractor.open_nursery( + debug_mode=True, + ) as n: portal = await n.run_in_actor( breakpoint_forever, @@ -21,4 +23,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main, debug_mode=True, loglevel='debug') + trio.run(main) diff --git a/examples/debugging/subactor_error.py b/examples/debugging/subactor_error.py index 86bb7ca..e38c161 100644 --- a/examples/debugging/subactor_error.py +++ b/examples/debugging/subactor_error.py @@ -1,3 +1,4 @@ +import trio import tractor @@ -6,11 +7,13 @@ async def name_error(): async def main(): - async with tractor.open_nursery() as n: + async with tractor.open_nursery( + debug_mode=True, + ) as n: portal = await n.run_in_actor(name_error) await portal.result() if __name__ == '__main__': - tractor.run(main, debug_mode=True) + trio.run(main) diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py index 126eed9..31eff62 100644 --- a/examples/full_fledged_streaming_service.py +++ b/examples/full_fledged_streaming_service.py @@ -68,10 +68,11 @@ async def aggregate(seed): # this is the main actor and *arbiter* async def main(): # a nursery which spawns "actors" - async with tractor.open_nursery() as nursery: + async with tractor.open_nursery( + arbiter_addr=('127.0.0.1', 1616) + ) as nursery: seed = int(1e3) - import time pre_start = time.time() portal = await nursery.start_actor( @@ -100,4 +101,4 @@ async def main(): if __name__ == '__main__': - final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) + final_stream = trio.run(main) diff --git a/examples/parallelism/_concurrent_futures_primes.py b/examples/parallelism/_concurrent_futures_primes.py index 81ae23d..7246b86 100644 --- a/examples/parallelism/_concurrent_futures_primes.py +++ b/examples/parallelism/_concurrent_futures_primes.py @@ -10,6 +10,7 @@ PRIMES = [ 115797848077099, 1099726899285419] + def is_prime(n): if n < 2: return False @@ -24,6 +25,7 @@ def is_prime(n): return False return True + def main(): with concurrent.futures.ProcessPoolExecutor() as executor: start = time.time() @@ -33,6 +35,7 @@ def main(): print(f'processing took {time.time() - start} seconds') + if __name__ == '__main__': start = time.time() diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index a2aec90..a7a7396 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -4,7 +4,7 @@ Demonstration of the prime number detector example from the https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example -This uses no extra threads, fancy semaphores or futures; all we need +This uses no extra threads, fancy semaphores or futures; all we need is ``tractor``'s channels. """ diff --git a/examples/remote_error_propagation.py b/examples/remote_error_propagation.py index 0999d39..aa2a73b 100644 --- a/examples/remote_error_propagation.py +++ b/examples/remote_error_propagation.py @@ -1,3 +1,4 @@ +import trio import tractor @@ -11,7 +12,7 @@ async def main(): for i in range(3): real_actors.append(await n.start_actor( f'actor_{i}', - rpc_module_paths=[__name__], + enable_modules=[__name__], )) # start one actor that will fail immediately @@ -24,6 +25,6 @@ async def main(): if __name__ == '__main__': try: # also raises - tractor.run(main) + trio.run(main) except tractor.RemoteActorError: print("Look Maa that actor failed hard, hehhh!") diff --git a/examples/service_discovery.py b/examples/service_discovery.py index 9088986..858f7f1 100644 --- a/examples/service_discovery.py +++ b/examples/service_discovery.py @@ -1,7 +1,9 @@ +import trio import tractor tractor.log.get_console_log("INFO") + async def main(service_name): async with tractor.open_nursery() as an: @@ -17,4 +19,4 @@ async def main(service_name): if __name__ == '__main__': - tractor.run(main, 'some_actor_name') + trio.run(main, 'some_actor_name') diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index eadcb44..da181c6 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -47,7 +47,9 @@ def test_remote_error(arb_addr, args_err): args, errtype = args_err async def main(): - async with tractor.open_nursery() as nursery: + async with tractor.open_nursery( + arbiter_addr=arb_addr, + ) as nursery: portal = await nursery.run_in_actor( assert_err, name='errorer', **args @@ -62,7 +64,7 @@ def test_remote_error(arb_addr, args_err): raise with pytest.raises(tractor.RemoteActorError) as excinfo: - tractor.run(main, arbiter_addr=arb_addr) + trio.run(main) # ensure boxed error is correct assert excinfo.value.type == errtype @@ -73,7 +75,9 @@ def test_multierror(arb_addr): more then one actor errors. """ async def main(): - async with tractor.open_nursery() as nursery: + async with tractor.open_nursery( + arbiter_addr=arb_addr, + ) as nursery: await nursery.run_in_actor(assert_err, name='errorer1') portal2 = await nursery.run_in_actor(assert_err, name='errorer2') @@ -90,7 +94,7 @@ def test_multierror(arb_addr): # from both subactors with pytest.raises(trio.MultiError): - tractor.run(main, arbiter_addr=arb_addr) + trio.run(main) @pytest.mark.parametrize('delay', (0, 0.5)) @@ -103,7 +107,10 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): to test failure during an ongoing spawning. """ async def main(): - async with tractor.open_nursery() as nursery: + async with tractor.open_nursery( + arbiter_addr=arb_addr, + ) as nursery: + for i in range(num_subactors): await nursery.run_in_actor( assert_err, @@ -112,7 +119,7 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): ) with pytest.raises(trio.MultiError) as exc_info: - tractor.run(main, arbiter_addr=arb_addr) + trio.run(main) assert exc_info.type == tractor.MultiError err = exc_info.value @@ -134,10 +141,12 @@ def test_cancel_single_subactor(arb_addr, mechanism): async def spawn_actor(): """Spawn an actor that blocks indefinitely. """ - async with tractor.open_nursery() as nursery: + async with tractor.open_nursery( + arbiter_addr=arb_addr, + ) as nursery: portal = await nursery.start_actor( - 'nothin', rpc_module_paths=[__name__], + 'nothin', enable_modules=[__name__], ) assert (await portal.run(do_nothing)) is None @@ -148,10 +157,10 @@ def test_cancel_single_subactor(arb_addr, mechanism): raise mechanism if mechanism == 'nursery_cancel': - tractor.run(spawn_actor, arbiter_addr=arb_addr) + trio.run(spawn_actor) else: with pytest.raises(mechanism): - tractor.run(spawn_actor, arbiter_addr=arb_addr) + trio.run(spawn_actor) async def stream_forever(): @@ -229,7 +238,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel): for i in range(num_actors): dactor_portals.append(await n.start_actor( f'deamon_{i}', - rpc_module_paths=[__name__], + enable_modules=[__name__], )) func, kwargs = ria_func @@ -395,7 +404,7 @@ def test_cancel_via_SIGINT( await trio.sleep_forever() with pytest.raises(KeyboardInterrupt): - tractor.run(main) + trio.run(main) @no_windows @@ -430,8 +439,7 @@ def test_cancel_via_SIGINT_other_task( os.kill(pid, signal.SIGINT) with pytest.raises(KeyboardInterrupt): - tractor.run(main) - + trio.run(main) async def spin_for(period=3): "Sync sleep." @@ -470,4 +478,4 @@ def test_cancel_while_childs_child_in_sync_sleep( assert 0 with pytest.raises(AssertionError): - tractor.run(main) + trio.run(main) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index af03ce6..383608e 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -20,8 +20,11 @@ async def test_reg_then_unreg(arb_addr): assert actor.is_arbiter assert len(actor._registry) == 1 # only self is registered - async with tractor.open_nursery() as n: - portal = await n.start_actor('actor', rpc_module_paths=[__name__]) + async with tractor.open_nursery( + arbiter_addr=arb_addr, + ) as n: + + portal = await n.start_actor('actor', enable_modules=[__name__]) uid = portal.channel.uid async with tractor.get_arbiter(*arb_addr) as aportal: @@ -66,7 +69,7 @@ async def say_hello_use_wait(other_actor): @tractor_test @pytest.mark.parametrize('func', [say_hello, say_hello_use_wait]) -async def test_trynamic_trio(func, start_method): +async def test_trynamic_trio(func, start_method, arb_addr): """Main tractor entry point, the "master" process (for now acts as the "director"). """ @@ -119,74 +122,78 @@ async def spawn_and_check_registry( remote_arbiter: bool = False, with_streaming: bool = False, ) -> None: - actor = tractor.current_actor() - if remote_arbiter: - assert not actor.is_arbiter + async with tractor.open_root_actor( + arbiter_addr=arb_addr, + ): + async with tractor.get_arbiter(*arb_addr) as portal: + # runtime needs to be up to call this + actor = tractor.current_actor() - async with tractor.get_arbiter(*arb_addr) as portal: + if remote_arbiter: + assert not actor.is_arbiter - if actor.is_arbiter: + if actor.is_arbiter: - async def get_reg(): - return actor._registry + async def get_reg(): + return actor._registry - extra = 1 # arbiter is local root actor - else: - get_reg = partial(portal.run_from_ns, 'self', 'get_registry') - extra = 2 # local root actor + remote arbiter + extra = 1 # arbiter is local root actor + else: + get_reg = partial(portal.run_from_ns, 'self', 'get_registry') + extra = 2 # local root actor + remote arbiter - # ensure current actor is registered - registry = await get_reg() - assert actor.uid in registry + # ensure current actor is registered + registry = await get_reg() + assert actor.uid in registry - try: - async with tractor.open_nursery() as n: - async with trio.open_nursery() as trion: + try: + async with tractor.open_nursery() as n: + async with trio.open_nursery() as trion: + + portals = {} + for i in range(3): + name = f'a{i}' + if with_streaming: + portals[name] = await n.start_actor( + name=name, enable_modules=[__name__]) + + else: # no streaming + portals[name] = await n.run_in_actor( + trio.sleep_forever, name=name) + + # wait on last actor to come up + async with tractor.wait_for_actor(name): + registry = await get_reg() + for uid in n._children: + assert uid in registry + + assert len(portals) + extra == len(registry) - portals = {} - for i in range(3): - name = f'a{i}' if with_streaming: - portals[name] = await n.start_actor( - name=name, enable_modules=[__name__]) + await trio.sleep(0.1) - else: # no streaming - portals[name] = await n.run_in_actor( - trio.sleep_forever, name=name) + pts = list(portals.values()) + for p in pts[:-1]: + trion.start_soon(stream_from, p) - # wait on last actor to come up - async with tractor.wait_for_actor(name): - registry = await get_reg() - for uid in n._children: - assert uid in registry + # stream for 1 sec + trion.start_soon(cancel, use_signal, 1) - assert len(portals) + extra == len(registry) + last_p = pts[-1] + await stream_from(last_p) - if with_streaming: - await trio.sleep(0.1) + else: + await cancel(use_signal) - pts = list(portals.values()) - for p in pts[:-1]: - trion.start_soon(stream_from, p) + finally: + with trio.CancelScope(shield=True): + await trio.sleep(0.5) - # stream for 1 sec - trion.start_soon(cancel, use_signal, 1) - - last_p = pts[-1] - await stream_from(last_p) - - else: - await cancel(use_signal) - - finally: - with trio.CancelScope(shield=True): - await trio.sleep(0.5) - - # all subactors should have de-registered - registry = await get_reg() - assert len(registry) == extra - assert actor.uid in registry + # all subactors should have de-registered + registry = await get_reg() + assert len(registry) == extra + assert actor.uid in registry @pytest.mark.parametrize('use_signal', [False, True]) @@ -201,7 +208,7 @@ def test_subactors_unregister_on_cancel( deregistering themselves with the arbiter. """ with pytest.raises(KeyboardInterrupt): - tractor.run( + trio.run( partial( spawn_and_check_registry, arb_addr, @@ -209,7 +216,6 @@ def test_subactors_unregister_on_cancel( remote_arbiter=False, with_streaming=with_streaming, ), - arbiter_addr=arb_addr ) @@ -227,7 +233,7 @@ def test_subactors_unregister_on_cancel_remote_daemon( tree) arbiter. """ with pytest.raises(KeyboardInterrupt): - tractor.run( + trio.run( partial( spawn_and_check_registry, arb_addr, @@ -235,8 +241,6 @@ def test_subactors_unregister_on_cancel_remote_daemon( remote_arbiter=True, with_streaming=with_streaming, ), - # XXX: required to use remote daemon! - arbiter_addr=arb_addr ) @@ -258,49 +262,52 @@ async def close_chans_before_nursery( else: entries_at_end = 1 - async with tractor.get_arbiter(*arb_addr) as aportal: - try: - get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') + async with tractor.open_root_actor( + arbiter_addr=arb_addr, + ): + async with tractor.get_arbiter(*arb_addr) as aportal: + try: + get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') - async with tractor.open_nursery() as tn: - portal1 = await tn.start_actor( - name='consumer1', enable_modules=[__name__]) - portal2 = await tn.start_actor( - 'consumer2', enable_modules=[__name__]) + async with tractor.open_nursery() as tn: + portal1 = await tn.start_actor( + name='consumer1', enable_modules=[__name__]) + portal2 = await tn.start_actor( + 'consumer2', enable_modules=[__name__]) - # TODO: compact this back as was in last commit once - # 3.9+, see https://github.com/goodboy/tractor/issues/207 - async with portal1.open_stream_from(stream_forever) as agen1: - async with portal2.open_stream_from( - stream_forever - ) as agen2: - async with trio.open_nursery() as n: - n.start_soon(streamer, agen1) - n.start_soon(cancel, use_signal, .5) - try: - await streamer(agen2) - finally: - # Kill the root nursery thus resulting in - # normal arbiter channel ops to fail during - # teardown. It doesn't seem like this is - # reliably triggered by an external SIGINT. - # tractor.current_actor()._root_nursery.cancel_scope.cancel() + # TODO: compact this back as was in last commit once + # 3.9+, see https://github.com/goodboy/tractor/issues/207 + async with portal1.open_stream_from(stream_forever) as agen1: + async with portal2.open_stream_from( + stream_forever + ) as agen2: + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + # tractor.current_actor()._root_nursery.cancel_scope.cancel() - # XXX: THIS IS THE KEY THING that happens - # **before** exiting the actor nursery block + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block - # also kill off channels cuz why not - await agen1.aclose() - await agen2.aclose() - finally: - with trio.CancelScope(shield=True): - await trio.sleep(1) + # also kill off channels cuz why not + await agen1.aclose() + await agen2.aclose() + finally: + with trio.CancelScope(shield=True): + await trio.sleep(1) - # all subactors should have de-registered - registry = await get_reg() - assert portal1.channel.uid not in registry - assert portal2.channel.uid not in registry - assert len(registry) == entries_at_end + # all subactors should have de-registered + registry = await get_reg() + assert portal1.channel.uid not in registry + assert portal2.channel.uid not in registry + assert len(registry) == entries_at_end @pytest.mark.parametrize('use_signal', [False, True]) @@ -314,15 +321,13 @@ def test_close_channel_explicit( results in subactor(s) deregistering from the arbiter. """ with pytest.raises(KeyboardInterrupt): - tractor.run( + trio.run( partial( close_chans_before_nursery, arb_addr, use_signal, remote_arbiter=False, ), - # XXX: required to use remote daemon! - arbiter_addr=arb_addr ) @@ -338,13 +343,11 @@ def test_close_channel_explicit_remote_arbiter( results in subactor(s) deregistering from the arbiter. """ with pytest.raises(KeyboardInterrupt): - tractor.run( + trio.run( partial( close_chans_before_nursery, arb_addr, use_signal, remote_arbiter=True, ), - # XXX: required to use remote daemon! - arbiter_addr=arb_addr ) diff --git a/tests/test_local.py b/tests/test_local.py index 60e875b..016df3f 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -15,8 +15,8 @@ async def test_no_arbitter(): """An arbitter must be established before any nurseries can be created. - (In other words ``tractor.run`` must be used instead of ``trio.run`` as is - done by the ``pytest-trio`` plugin.) + (In other words ``tractor.open_root_actor()`` must be engaged at + some point?) """ with pytest.raises(RuntimeError): with tractor.open_nursery(): @@ -49,7 +49,8 @@ async def test_self_is_registered_localportal(arb_addr): assert isinstance(portal, tractor._portal.LocalPortal) with trio.fail_after(0.2): - sockaddr = await portal.run_from_ns('self', 'wait_for_actor', name='root') + sockaddr = await portal.run_from_ns( + 'self', 'wait_for_actor', name='root') assert sockaddr[0] == arb_addr @@ -59,15 +60,19 @@ def test_local_actor_async_func(arb_addr): nums = [] async def print_loop(): - # arbiter is started in-proc if dne - assert tractor.current_actor().is_arbiter - for i in range(10): - nums.append(i) - await trio.sleep(0.1) + async with tractor.open_root_actor( + arbiter_addr=arb_addr, + ): + # arbiter is started in-proc if dne + assert tractor.current_actor().is_arbiter + + for i in range(10): + nums.append(i) + await trio.sleep(0.1) start = time.time() - tractor.run(print_loop, arbiter_addr=arb_addr) + trio.run(print_loop) # ensure the sleeps were actually awaited assert time.time() - start >= 1 diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index 12ca3ef..e7a3ac5 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -1,10 +1,11 @@ """ -Multiple python programs invoking ``tractor.run()`` +Multiple python programs invoking the runtime. """ import platform import time import pytest +import trio import tractor from conftest import ( tractor_test, @@ -45,8 +46,13 @@ async def test_cancel_remote_arbiter(daemon, arb_addr): def test_register_duplicate_name(daemon, arb_addr): async def main(): - assert not tractor.current_actor().is_arbiter - async with tractor.open_nursery() as n: + + async with tractor.open_nursery( + arbiter_addr=arb_addr, + ) as n: + + assert not tractor.current_actor().is_arbiter + p1 = await n.start_actor('doggy') p2 = await n.start_actor('doggy') @@ -57,4 +63,4 @@ def test_register_duplicate_name(daemon, arb_addr): # run it manually since we want to start **after** # the other "daemon" program - tractor.run(main, arbiter_addr=arb_addr) + trio.run(main) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 0d4c62d..a548537 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -46,8 +46,9 @@ async def pubber(get_topics, seed=10): async def subs( - which, pub_actor_name, seed=10, - portal=None, + which, + pub_actor_name, + seed=10, task_status=trio.TASK_STATUS_IGNORED, ): if len(which) == 1: @@ -61,7 +62,9 @@ async def subs( return isinstance(i, int) # TODO: https://github.com/goodboy/tractor/issues/207 - async with tractor.find_actor(pub_actor_name) as portal: + async with tractor.wait_for_actor(pub_actor_name) as portal: + assert portal + async with portal.open_stream_from( pubber, topics=which, @@ -164,7 +167,10 @@ def test_multi_actor_subs_arbiter_pub( async def main(): - async with tractor.open_nursery() as n: + async with tractor.open_nursery( + arbiter_addr=arb_addr, + enable_modules=[__name__], + ) as n: name = 'root' @@ -172,7 +178,7 @@ def test_multi_actor_subs_arbiter_pub( # start the publisher as a daemon master_portal = await n.start_actor( 'streamer', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) even_portal = await n.run_in_actor( @@ -232,7 +238,6 @@ def test_multi_actor_subs_arbiter_pub( assert 'even' not in get_topics() await odd_portal.cancel_actor() - await trio.sleep(2) if pub_actor == 'arbiter': while get_topics(): @@ -242,11 +247,7 @@ def test_multi_actor_subs_arbiter_pub( else: await master_portal.cancel_actor() - tractor.run( - main, - arbiter_addr=arb_addr, - rpc_module_paths=[__name__], - ) + trio.run(main) def test_single_subactor_pub_multitask_subs( @@ -255,11 +256,14 @@ def test_single_subactor_pub_multitask_subs( ): async def main(): - async with tractor.open_nursery() as n: + async with tractor.open_nursery( + arbiter_addr=arb_addr, + enable_modules=[__name__], + ) as n: portal = await n.start_actor( 'streamer', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) async with tractor.wait_for_actor('streamer'): # block until 2nd actor is initialized @@ -283,8 +287,4 @@ def test_single_subactor_pub_multitask_subs( await portal.cancel_actor() - tractor.run( - main, - arbiter_addr=arb_addr, - rpc_module_paths=[__name__], - ) + trio.run(main) diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 2a2b406..6d15896 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -74,11 +74,15 @@ def test_rpc_errors(arb_addr, to_call, testdir): remote_err = inside_err async def main(): - actor = tractor.current_actor() - assert actor.is_arbiter # spawn a subactor which calls us back - async with tractor.open_nursery() as n: + async with tractor.open_nursery( + arbiter_addr=arb_addr, + enable_modules=exposed_mods.copy(), + ) as n: + + actor = tractor.current_actor() + assert actor.is_arbiter await n.run_in_actor( sleep_back_actor, actor_name=subactor_requests_to, @@ -90,15 +94,11 @@ def test_rpc_errors(arb_addr, to_call, testdir): func_name=funcname, exposed_mods=exposed_mods, func_defined=True if func_defined else False, - rpc_module_paths=subactor_exposed_mods, + enable_modules=subactor_exposed_mods, ) def run(): - tractor.run( - main, - arbiter_addr=arb_addr, - rpc_module_paths=exposed_mods.copy(), - ) + trio.run(main) # handle both parameterized cases if exposed_mods and func_defined: diff --git a/tests/test_spawning.py b/tests/test_spawning.py index fb1e1be..f9eeb16 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -1,7 +1,6 @@ """ Spawning basics """ -from functools import partial import pytest import trio @@ -12,41 +11,50 @@ from conftest import tractor_test data_to_pass_down = {'doggy': 10, 'kitty': 4} -async def spawn(is_arbiter, data): +async def spawn(is_arbiter, data, arb_addr): namespaces = [__name__] await trio.sleep(0.1) - actor = tractor.current_actor() - assert actor.is_arbiter == is_arbiter - data == data_to_pass_down - if actor.is_arbiter: - async with tractor.open_nursery() as nursery: - # forks here - portal = await nursery.run_in_actor( - spawn, - is_arbiter=False, - name='sub-actor', - data=data, - rpc_module_paths=namespaces, - ) + async with tractor.open_root_actor( + arbiter_addr=arb_addr, + ): - assert len(nursery._children) == 1 - assert portal.channel.uid in tractor.current_actor()._peers - # be sure we can still get the result - result = await portal.result() - assert result == 10 - return result - else: - return 10 + actor = tractor.current_actor() + assert actor.is_arbiter == is_arbiter + data = data_to_pass_down + + if actor.is_arbiter: + + async with tractor.open_nursery( + ) as nursery: + + # forks here + portal = await nursery.run_in_actor( + spawn, + is_arbiter=False, + name='sub-actor', + data=data, + arb_addr=arb_addr, + enable_modules=namespaces, + ) + + assert len(nursery._children) == 1 + assert portal.channel.uid in tractor.current_actor()._peers + # be sure we can still get the result + result = await portal.result() + assert result == 10 + return result + else: + return 10 def test_local_arbiter_subactor_global_state(arb_addr): - result = tractor.run( - partial(spawn, data=data_to_pass_down), + result = trio.run( + spawn, True, - name='arbiter', - arbiter_addr=arb_addr, + data_to_pass_down, + arb_addr, ) assert result == 10 @@ -67,7 +75,7 @@ async def test_movie_theatre_convo(start_method): portal = await n.start_actor( 'frank', # enable the actor to run funcs from this current module - rpc_module_paths=[__name__], + enable_modules=[__name__], ) print(await portal.run(movie_theatre_question)) @@ -121,19 +129,20 @@ def test_loglevel_propagated_to_subactor( level = 'critical' async def main(): - async with tractor.open_nursery() as tn: + async with tractor.open_nursery( + name='arbiter', + loglevel=level, + start_method=start_method, + arbiter_addr=arb_addr, + + ) as tn: await tn.run_in_actor( check_loglevel, level=level, ) - tractor.run( - main, - name='arbiter', - loglevel=level, - start_method=start_method, - arbiter_addr=arb_addr, - ) + trio.run(main) + # ensure subactor spits log message on stderr captured = capfd.readouterr() assert 'yoyoyo' in captured.err diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 9aba327..8d8169e 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -50,14 +50,24 @@ async def context_stream(ctx, sequence): assert cs.cancelled_caught -async def stream_from_single_subactor(stream_func): +async def stream_from_single_subactor( + arb_addr, + start_method, + stream_func, +): """Verify we can spawn a daemon actor and retrieve streamed data. """ - async with tractor.find_actor('streamerd') as portals: + # only one per host address, spawns an actor if None + + async with tractor.open_nursery( + arbiter_addr=arb_addr, + start_method=start_method, + ) as nursery: + + async with tractor.find_actor('streamerd') as portals: + + if not portals: - if not portals: - # only one per host address, spawns an actor if None - async with tractor.open_nursery() as nursery: # no brokerd actor found portal = await nursery.start_actor( 'streamerd', @@ -101,13 +111,13 @@ async def stream_from_single_subactor(stream_func): def test_stream_from_single_subactor(arb_addr, start_method, stream_func): """Verify streaming from a spawned async generator. """ - tractor.run( + trio.run( partial( stream_from_single_subactor, + arb_addr, + start_method, stream_func=stream_func, ), - arbiter_addr=arb_addr, - start_method=start_method, ) @@ -208,9 +218,10 @@ async def a_quadruple_example(): return result_stream -async def cancel_after(wait): - with trio.move_on_after(wait): - return await a_quadruple_example() +async def cancel_after(wait, arb_addr): + async with tractor.open_root_actor(arbiter_addr=arb_addr): + with trio.move_on_after(wait): + return await a_quadruple_example() @pytest.fixture(scope='module') @@ -222,7 +233,7 @@ def time_quad_ex(arb_addr, ci_env, spawn_backend): timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4 start = time.time() - results = tractor.run(cancel_after, timeout, arbiter_addr=arb_addr) + results = trio.run(cancel_after, timeout, arb_addr) diff = time.time() - start assert results return results, diff @@ -249,7 +260,7 @@ def test_not_fast_enough_quad( """ results, diff = time_quad_ex delay = max(diff - cancel_delay, 0) - results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr) + results = trio.run(cancel_after, delay, arb_addr) system = platform.system() if system in ('Windows', 'Darwin') and results is not None: # In CI envoirments it seems later runs are quicker then the first diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 4674bd9..dcf7aa5 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -113,6 +113,7 @@ class ActorNursery: name: Optional[str] = None, bind_addr: Tuple[str, int] = _default_bind_addr, rpc_module_paths: Optional[List[str]] = None, + enable_modules: List[str] = None, loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` ) -> Portal: @@ -131,7 +132,9 @@ class ActorNursery: portal = await self.start_actor( name, - rpc_module_paths=[mod_path] + (rpc_module_paths or []), + enable_modules=[mod_path] + ( + enable_modules or rpc_module_paths or [] + ), bind_addr=bind_addr, loglevel=loglevel, # use the run_in_actor nursery @@ -366,7 +369,6 @@ async def open_nursery( async with _open_and_supervise_one_cancels_all_nursery( actor ) as anursery: - yield anursery else: # sub-nursery case