diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 653170b..8492664 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -2,13 +2,34 @@ Actor model API testing """ import time -from functools import partial +from functools import partial, wraps +import random import pytest import trio import tractor +_arb_addr = '127.0.0.1', random.randint(1000, 9999) + + +def tractor_test(fn): + """ + Use: + + @tractor_test + async def test_whatever(): + await ... + """ + @wraps(fn) + def wrapper(*args, **kwargs): + __tracebackhide__ = True + return tractor.run( + partial(fn, *args), arbiter_addr=_arb_addr, **kwargs) + + return wrapper + + @pytest.mark.trio async def test_no_arbitter(): """An arbitter must be established before any nurseries @@ -36,7 +57,7 @@ def test_local_actor_async_func(): await trio.sleep(0.1) start = time.time() - tractor.run(print_loop) + tractor.run(print_loop, arbiter_addr=_arb_addr) # ensure the sleeps were actually awaited assert time.time() - start >= 1 @@ -82,6 +103,7 @@ def test_local_arbiter_subactor_global_state(): True, name='arbiter', statespace=statespace, + arbiter_addr=_arb_addr, ) assert result == 10 @@ -135,7 +157,7 @@ async def stream_from_single_subactor(): def test_stream_from_single_subactor(): """Verify streaming from a spawned async generator. """ - tractor.run(stream_from_single_subactor) + tractor.run(stream_from_single_subactor, arbiter_addr=_arb_addr) async def assert_err(): @@ -162,7 +184,92 @@ def test_remote_error(): with pytest.raises(tractor.RemoteActorError): # also raises - tractor.run(main) + tractor.run(main, arbiter_addr=_arb_addr) + + +the_line = 'Hi my name is {}' + + +async def hi(): + return the_line.format(tractor.current_actor().name) + + +async def say_hello(other_actor): + await trio.sleep(0.4) # wait for other actor to spawn + async with tractor.find_actor(other_actor) as portal: + return await portal.run(__name__, 'hi') + + +@tractor_test +async def test_trynamic_trio(): + """Main tractor entry point, the "master" process (for now + acts as the "director"). + """ + async with tractor.open_nursery() as n: + print("Alright... Action!") + + donny = await n.start_actor( + 'donny', + main=partial(say_hello, 'gretchen'), + rpc_module_paths=[__name__], + outlive_main=True + ) + gretchen = await n.start_actor( + 'gretchen', + main=partial(say_hello, 'donny'), + rpc_module_paths=[__name__], + # outlive_main=True + ) + print(await gretchen.result()) + print(await donny.result()) + await donny.cancel_actor() + print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") + + +def movie_theatre_question(): + """A question asked in a dark theatre, in a tangent + (errr, I mean different) process. + """ + return 'have you ever seen a portal?' + + +@tractor_test +async def test_movie_theatre_convo(): + """The main ``tractor`` routine. + """ + async with tractor.open_nursery() as n: + portal = await n.start_actor( + 'frank', + # enable the actor to run funcs from this current module + rpc_module_paths=[__name__], + outlive_main=True, + ) + + print(await portal.run(__name__, 'movie_theatre_question')) + # calls the subactor a 2nd time + print(await portal.run(__name__, 'movie_theatre_question')) + + # the async with will block here indefinitely waiting + # for our actor "frank" to complete, but since it's an + # "outlive_main" actor it will never until cancelled + await portal.cancel_actor() + + +def cellar_door(): + return "Dang that's beautiful" + + +@tractor_test +async def test_most_beautiful_word(): + """The main ``tractor`` routine. + """ + async with tractor.open_nursery() as n: + portal = await n.start_actor('some_linguist', main=cellar_door) + + # The ``async with`` will unblock here since the 'some_linguist' + # actor has completed its main task ``cellar_door``. + + print(await portal.result()) def do_nothing(): @@ -178,18 +285,17 @@ def test_cancel_single_subactor(): portal = await nursery.start_actor( 'nothin', rpc_module_paths=[__name__], ) - assert not await portal.run(__name__, 'do_nothing') + assert (await portal.run(__name__, 'do_nothing')) is None # would hang otherwise await nursery.cancel() - tractor.run(main) + tractor.run(main, arbiter_addr=_arb_addr) async def stream_data(seed): for i in range(seed): yield i - # await trio.sleep(1/10000.) # trigger scheduler await trio.sleep(0) # trigger scheduler @@ -209,7 +315,7 @@ async def aggregate(seed): portals.append(portal) - q = trio.Queue(int(1e3)) + q = trio.Queue(500) async def push_to_q(portal): async for value in await portal.run( @@ -231,11 +337,11 @@ async def aggregate(seed): unique_vals.add(value) # yield upwards to the spawning parent actor yield value - continue + + if value is None: + break assert value in unique_vals - if value is None: - break print("FINISHED ITERATING in aggregator") @@ -244,13 +350,15 @@ async def aggregate(seed): print("AGGREGATOR COMPLETE!") -async def main(): +# @tractor_test +async def a_quadruple_example(): # a nursery which spawns "actors" async with tractor.open_nursery() as nursery: seed = int(10) import time pre_start = time.time() + portal = await nursery.start_actor( name='aggregator', # executed in the actor's "main task" immediately @@ -269,13 +377,21 @@ async def main(): assert result_stream == list(range(seed)) + [None] -def test_show_me_the_code(): +async def cancel_after(wait): + with trio.move_on_after(wait): + await a_quadruple_example() + + +def test_a_quadruple_example(): """Verify the *show me the code* readme example works. """ - tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) + tractor.run(cancel_after, 2, arbiter_addr=_arb_addr) -def test_cancel_smtc(): - """Verify we can cancel midway through the smtc example gracefully. +def test_not_fast_enough_quad(): + """Verify we can cancel midway through the quad example and all actors + cancel gracefully. + + This also serves as a kind of "we'd like to eventually be this fast test". """ - pass + tractor.run(cancel_after, 1, arbiter_addr=_arb_addr)