From 78809345056426a75202265963e293c980288de1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Feb 2020 02:01:39 -0500 Subject: [PATCH] Add tests for all docs examples Parametrize our docs example test to include all (now fixed) examples from the `READ.rst`. The examples themselves have been fixed/corrected to run but they haven't yet been updated in the actual docs. Once #99 lands these example scripts will be directly included in our documentation so there will be no possibility of presenting incorrect examples to our users! This technically fixes #108 even though the new example aren't going to be included directly in our docs until #99 lands. --- examples/actor_spawning_and_causality.py | 22 +++++ examples/asynchronous_generators.py | 36 ++++++++ examples/full_fledged_streaming_service.py | 96 ++++++++++++++++++++++ examples/service_discovery.py | 20 +++++ tests/test_docs_examples.py | 18 ++-- 5 files changed, 184 insertions(+), 8 deletions(-) create mode 100644 examples/actor_spawning_and_causality.py create mode 100644 examples/asynchronous_generators.py create mode 100644 examples/full_fledged_streaming_service.py create mode 100644 examples/service_discovery.py diff --git a/examples/actor_spawning_and_causality.py b/examples/actor_spawning_and_causality.py new file mode 100644 index 0000000..fc0d7c7 --- /dev/null +++ b/examples/actor_spawning_and_causality.py @@ -0,0 +1,22 @@ +import tractor + + +def cellar_door(): + return "Dang that's beautiful" + + +async def main(): + """The main ``tractor`` routine. + """ + async with tractor.open_nursery() as n: + + portal = await n.run_in_actor('some_linguist', cellar_door) + + # The ``async with`` will unblock here since the 'some_linguist' + # actor has completed its main task ``cellar_door``. + + print(await portal.result()) + + +if __name__ == '__main__': + tractor.run(main) diff --git a/examples/asynchronous_generators.py b/examples/asynchronous_generators.py new file mode 100644 index 0000000..7ca0beb --- /dev/null +++ b/examples/asynchronous_generators.py @@ -0,0 +1,36 @@ +from itertools import repeat +import trio +import tractor + +tractor.log.get_console_log("INFO") + + +async def stream_forever(): + for i in repeat("I can see these little future bubble things"): + # each yielded value is sent over the ``Channel`` to the + # parent actor + yield i + await trio.sleep(0.01) + + +async def main(): + # stream for at most 1 seconds + with trio.move_on_after(1) as cancel_scope: + async with tractor.open_nursery() as n: + portal = await n.start_actor( + f'donny', + rpc_module_paths=[__name__], + ) + + # this async for loop streams values from the above + # async generator running in a separate process + async for letter in await portal.run(__name__, 'stream_forever'): + print(letter) + + # we support trio's cancellation system + assert cancel_scope.cancelled_caught + assert n.cancelled + + +if __name__ == '__main__': + tractor.run(main, start_method='forkserver') diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py new file mode 100644 index 0000000..e86c4ed --- /dev/null +++ b/examples/full_fledged_streaming_service.py @@ -0,0 +1,96 @@ +import time +import trio +import tractor + + +# this is the first 2 actors, streamer_1 and streamer_2 +async def stream_data(seed): + for i in range(seed): + yield i + await trio.sleep(0) # trigger scheduler + + +# this is the third actor; the aggregator +async def aggregate(seed): + """Ensure that the two streams we receive match but only stream + a single set of values to the parent. + """ + async with tractor.open_nursery() as nursery: + portals = [] + for i in range(1, 3): + # fork point + portal = await nursery.start_actor( + name=f'streamer_{i}', + rpc_module_paths=[__name__], + ) + + portals.append(portal) + + send_chan, recv_chan = trio.open_memory_channel(500) + + async def push_to_chan(portal, send_chan): + async with send_chan: + async for value in await portal.run( + __name__, 'stream_data', seed=seed + ): + # leverage trio's built-in backpressure + await send_chan.send(value) + + print(f"FINISHED ITERATING {portal.channel.uid}") + + # spawn 2 trio tasks to collect streams and push to a local queue + async with trio.open_nursery() as n: + + for portal in portals: + n.start_soon(push_to_chan, portal, send_chan.clone()) + + # close this local task's reference to send side + await send_chan.aclose() + + unique_vals = set() + async with recv_chan: + async for value in recv_chan: + if value not in unique_vals: + unique_vals.add(value) + # yield upwards to the spawning parent actor + yield value + + assert value in unique_vals + + print("FINISHED ITERATING in aggregator") + + await nursery.cancel() + print("WAITING on `ActorNursery` to finish") + print("AGGREGATOR COMPLETE!") + + +# this is the main actor and *arbiter* +async def main(): + # a nursery which spawns "actors" + async with tractor.open_nursery() as nursery: + + seed = int(1e3) + import time + pre_start = time.time() + + portal = await nursery.run_in_actor( + 'aggregator', + aggregate, + seed=seed, + ) + + start = time.time() + # the portal call returns exactly what you'd expect + # as if the remote "aggregate" function was called locally + result_stream = [] + async for value in await portal.result(): + result_stream.append(value) + + print(f"STREAM TIME = {time.time() - start}") + print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") + assert result_stream == list(range(seed)) + return result_stream + + +if __name__ == '__main__': + final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) diff --git a/examples/service_discovery.py b/examples/service_discovery.py new file mode 100644 index 0000000..9088986 --- /dev/null +++ b/examples/service_discovery.py @@ -0,0 +1,20 @@ +import tractor + +tractor.log.get_console_log("INFO") + +async def main(service_name): + + async with tractor.open_nursery() as an: + await an.start_actor(service_name) + + async with tractor.get_arbiter('127.0.0.1', 1616) as portal: + print(f"Arbiter is listening on {portal.channel}") + + async with tractor.wait_for_actor(service_name) as sockaddr: + print(f"my_service is found at {sockaddr}") + + await an.cancel() + + +if __name__ == '__main__': + tractor.run(main, 'some_actor_name') diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index c118ee1..c6bce2e 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -11,7 +11,6 @@ import shutil import pytest -@pytest.fixture(scope='session') def confdir(): dirname = os.path.dirname dirpath = os.path.abspath( @@ -20,13 +19,12 @@ def confdir(): return dirpath -@pytest.fixture -def examples_dir(confdir): - return os.path.join(confdir, 'examples') +def examples_dir(): + return os.path.join(confdir(), 'examples') @pytest.fixture -def run_example_in_subproc(loglevel, testdir, arb_addr, examples_dir): +def run_example_in_subproc(loglevel, testdir, arb_addr): @contextmanager def run(script_code): @@ -36,7 +34,7 @@ def run_example_in_subproc(loglevel, testdir, arb_addr, examples_dir): # on windows we need to create a special __main__.py which will # be executed with ``python -m __main__.py`` on windows.. shutil.copyfile( - os.path.join(examples_dir, '__main__.py'), + os.path.join(examples_dir(), '__main__.py'), os.path.join(str(testdir), '__main__.py') ) @@ -75,8 +73,12 @@ def run_example_in_subproc(loglevel, testdir, arb_addr, examples_dir): yield run -def test_example(examples_dir, run_example_in_subproc): - ex_file = os.path.join(examples_dir, 'a_trynamic_first_scene.py') +@pytest.mark.parametrize( + 'example_script', + [f for f in os.listdir(examples_dir()) if '__' not in f], +) +def test_example(run_example_in_subproc, example_script): + ex_file = os.path.join(examples_dir(), example_script) with open(ex_file, 'r') as ex: code = ex.read()