From c6f3ab5ae2ac1a943cd1d09251fc1a7b109fe01f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 31 Jan 2020 17:06:26 -0500 Subject: [PATCH 1/8] Initial examples testing attempt A per #98 we need tests for examples from the docs as they would be run by a user copy and pasting the code. This adds a small system for loading examples from an "examples/" directory and executing them in a subprocess while checking the output. We can use this to also verify end-to-end expected logging output on std streams (ex. logging on stderr). To expand this further we can parameterize the test list using the contents of the examples directory instead of hardcoding the script names as I've done here initially. Also, fix up the current readme examples to have the required/proper `if __name__ == '__main__'` script guard. --- examples/a_trynamic_first_scene.py | 44 ++++++++++++++++++++ tests/test_docs_examples.py | 65 ++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) create mode 100644 examples/a_trynamic_first_scene.py create mode 100644 tests/test_docs_examples.py diff --git a/examples/a_trynamic_first_scene.py b/examples/a_trynamic_first_scene.py new file mode 100644 index 0000000..8f43b9f --- /dev/null +++ b/examples/a_trynamic_first_scene.py @@ -0,0 +1,44 @@ +import tractor +from functools import partial + +_this_module = __name__ +the_line = 'Hi my name is {}' + + +tractor.log.get_console_log("INFO") + + +async def hi(): + return the_line.format(tractor.current_actor().name) + + +async def say_hello(other_actor): + async with tractor.wait_for_actor(other_actor) as portal: + return await portal.run(_this_module, 'hi') + + +async def main(): + """Main tractor entry point, the "master" process (for now + acts as the "director"). + """ + async with tractor.open_nursery() as n: + print("Alright... Action!") + + donny = await n.run_in_actor( + 'donny', + say_hello, + # arguments are always named + other_actor='gretchen', + ) + gretchen = await n.run_in_actor( + 'gretchen', + say_hello, + other_actor='donny', + ) + print(await gretchen.result()) + print(await donny.result()) + print("CUTTTT CUUTT CUT!!! Donny!! You're supposed to say...") + + +if __name__ == '__main__': + tractor.run(main) diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py new file mode 100644 index 0000000..87f7666 --- /dev/null +++ b/tests/test_docs_examples.py @@ -0,0 +1,65 @@ +""" +Let's make sure them docs work yah? +""" +from contextlib import contextmanager +import os +import sys +import subprocess +import platform +import pprint + +import pytest + + +@pytest.fixture(scope='session') +def confdir(): + dirname = os.path.dirname + dirpath = os.path.abspath( + dirname(dirname(os.path.realpath(__file__))) + ) + return dirpath + + +@pytest.fixture +def run_example_in_subproc(loglevel, testdir, arb_addr): + + @contextmanager + def run(script_code): + script_file = testdir.makefile('.py', script_code) + cmdargs = [ + sys.executable, + str(script_file), + ] + kwargs = dict() + if platform.system() == 'Windows': + # without this, tests hang on windows forever + kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP + + proc = testdir.popen( + cmdargs, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **kwargs, + ) + assert not proc.returncode + yield proc + proc.wait() + assert proc.returncode == 0 + + yield run + + +def test_a_trynamic_first_scene(confdir, run_example_in_subproc): + ex_file = os.path.join(confdir, 'examples', 'a_trynamic_first_scene.py') + with open(ex_file, 'r') as ex: + code = ex.read() + + with run_example_in_subproc(code) as proc: + proc.wait() + err, _ = proc.stderr.read(), proc.stdout.read() + + # if we get some gnarly output let's aggregate and raise + if err and b'Error' in err: + raise Exception(err.decode()) + + assert proc.returncode == 0 From 00fc7345807844fd65a545a9efb06e3e694009b0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 Feb 2020 20:01:41 -0500 Subject: [PATCH 2/8] Fix missing `_ctx` define when on Windows --- tractor/_spawn.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 9421d21..5700131 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -30,20 +30,21 @@ from ._actor import Actor, ActorFailure log = get_logger('tractor') -# use trip as our default on *nix systems for now -if platform.system() != 'Windows': - _spawn_method: str = "trio_run_in_process" -else: - _spawn_method = "spawn" - +# placeholder for an mp start context if so using that backend _ctx: Optional[mp.context.BaseContext] = None +_spawn_method: str = "spawn" if platform.system() == 'Windows': + _spawn_method = "spawn" + _ctx = mp.get_context("spawn") + async def proc_waiter(proc: mp.Process) -> None: await trio.hazmat.WaitForSingleObject(proc.sentinel) else: + # *NIX systems use ``trio_run_in_process` as our default (for now) import trio_run_in_process + _spawn_method = "trio_run_in_process" async def proc_waiter(proc: mp.Process) -> None: await trio.hazmat.wait_readable(proc.sentinel) @@ -53,9 +54,9 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: """Attempt to set the start method for process starting, aka the "actor spawning backend". - If the desired method is not supported this function will error. On Windows - the only supported option is the ``multiprocessing`` "spawn" method. The default - on *nix systems is ``trio_run_in_process``. + If the desired method is not supported this function will error. On + Windows the only supported option is the ``multiprocessing`` "spawn" + method. The default on *nix systems is ``trio_run_in_process``. """ global _ctx global _spawn_method From 70636a98f67d24e1cdfc17a9003182c7f9a2fbc0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 Feb 2020 20:20:46 -0500 Subject: [PATCH 3/8] Use the windows "gotchcas" fix for example tests Apply the fix from @chrizzFTD where we invoke the entry point using module exec mode on a ``__main__.py`` and import the ``test_example::`main()` from within that entry point script. --- examples/__main__.py | 11 ++++++++ examples/a_trynamic_first_scene.py | 2 +- tests/test_docs_examples.py | 44 ++++++++++++++++++++++++------ 3 files changed, 47 insertions(+), 10 deletions(-) create mode 100644 examples/__main__.py diff --git a/examples/__main__.py b/examples/__main__.py new file mode 100644 index 0000000..7cb2506 --- /dev/null +++ b/examples/__main__.py @@ -0,0 +1,11 @@ +""" +Needed in windows. +This needs to be the main program as it will be +called '__mp_main__' by the multiprocessing module + +""" +if __name__ == '__main__': + import multiprocessing + multiprocessing.freeze_support() + from . import test_example + test_example.tractor.run(test_example.main, start_method='spawn') diff --git a/examples/a_trynamic_first_scene.py b/examples/a_trynamic_first_scene.py index 8f43b9f..8197dc8 100644 --- a/examples/a_trynamic_first_scene.py +++ b/examples/a_trynamic_first_scene.py @@ -1,5 +1,5 @@ +import platform import tractor -from functools import partial _this_module = __name__ the_line = 'Hi my name is {}' diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 87f7666..c118ee1 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -6,7 +6,7 @@ import os import sys import subprocess import platform -import pprint +import shutil import pytest @@ -21,20 +21,46 @@ def confdir(): @pytest.fixture -def run_example_in_subproc(loglevel, testdir, arb_addr): +def examples_dir(confdir): + return os.path.join(confdir, 'examples') + + +@pytest.fixture +def run_example_in_subproc(loglevel, testdir, arb_addr, examples_dir): @contextmanager def run(script_code): - script_file = testdir.makefile('.py', script_code) - cmdargs = [ - sys.executable, - str(script_file), - ] kwargs = dict() + if platform.system() == 'Windows': + # on windows we need to create a special __main__.py which will + # be executed with ``python -m __main__.py`` on windows.. + shutil.copyfile( + os.path.join(examples_dir, '__main__.py'), + os.path.join(str(testdir), '__main__.py') + ) + + # drop the ``if __name__ == '__main__'`` guard + script_code = '\n'.join(script_code.splitlines()[:-4]) + script_file = testdir.makefile('.py', script_code) + # without this, tests hang on windows forever kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP + # run the "libary module" as a script + cmdargs = [ + sys.executable, + '-m', + # use the "module name" of this "package" + 'test_example' + ] + else: + script_file = testdir.makefile('.py', script_code) + cmdargs = [ + sys.executable, + str(script_file), + ] + proc = testdir.popen( cmdargs, stdout=subprocess.PIPE, @@ -49,8 +75,8 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): yield run -def test_a_trynamic_first_scene(confdir, run_example_in_subproc): - ex_file = os.path.join(confdir, 'examples', 'a_trynamic_first_scene.py') +def test_example(examples_dir, run_example_in_subproc): + ex_file = os.path.join(examples_dir, 'a_trynamic_first_scene.py') with open(ex_file, 'r') as ex: code = ex.read() From 596aca8097fd82006d9f257e0aa849ae40b9a775 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Feb 2020 01:05:52 -0500 Subject: [PATCH 4/8] Alias __mp_main__ at import time --- tractor/_actor.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 7118a3e..443c48c 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -270,7 +270,10 @@ class Actor: # should allow for relative (at least downward) imports. sys.path.append(os.path.dirname(filepath)) log.debug(f"Attempting to import {modpath}@{filepath}") - self._mods[modpath] = importlib.import_module(modpath) + mod = importlib.import_module(modpath) + self._mods[modpath] = mod + if modpath == '__main__': + self._mods['__mp_main__'] = mod except ModuleNotFoundError: # it is expected the corresponding `ModuleNotExposed` error # will be raised later @@ -278,11 +281,6 @@ class Actor: raise def _get_rpc_func(self, ns, funcname): - if ns == "__mp_main__": - # In subprocesses, `__main__` will actually map to - # `__mp_main__` which should be the same entry-point-module - # as the parent. - ns = "__main__" try: return getattr(self._mods[ns], funcname) except KeyError as err: From 9fb05d88498b14286cad14356371fb9cb0b6212e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Feb 2020 01:06:11 -0500 Subject: [PATCH 5/8] Drop uneeded import --- README.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/README.rst b/README.rst index 2dabf6b..6664828 100644 --- a/README.rst +++ b/README.rst @@ -108,7 +108,6 @@ the hip new film we're shooting: .. code:: python import tractor - from functools import partial _this_module = __name__ the_line = 'Hi my name is {}' From 30f8dd8be463c4c43da3e9bec1f8322ad165aa75 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Feb 2020 01:59:10 -0500 Subject: [PATCH 6/8] Pass a `Channel` to `LocalPortal` for compat purposes --- tractor/_discovery.py | 7 ++++--- tractor/_portal.py | 1 + 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index ae2e3be..3667c90 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -5,7 +5,7 @@ import typing from typing import Tuple, Optional, Union from async_generator import asynccontextmanager -from ._ipc import _connect_chan +from ._ipc import _connect_chan, Channel from ._portal import ( Portal, open_portal, @@ -16,7 +16,8 @@ from ._state import current_actor @asynccontextmanager async def get_arbiter( - host: str, port: int + host: str, + port: int, ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: """Return a portal instance connected to a local or remote arbiter. @@ -28,7 +29,7 @@ async def get_arbiter( if actor.is_arbiter: # we're already the arbiter # (likely a re-entrant call from the arbiter actor) - yield LocalPortal(actor) + yield LocalPortal(actor, Channel((host, port))) else: async with _connect_chan(host, port) as chan: async with open_portal(chan) as arb_portal: diff --git a/tractor/_portal.py b/tractor/_portal.py index 8f5899b..89cab39 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -298,6 +298,7 @@ class LocalPortal: using an in process actor instance. """ actor: 'Actor' # type: ignore + channel: Channel async def run(self, ns: str, func_name: str, **kwargs) -> Any: """Run a requested function locally and return it's result. From 78809345056426a75202265963e293c980288de1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Feb 2020 02:01:39 -0500 Subject: [PATCH 7/8] Add tests for all docs examples Parametrize our docs example test to include all (now fixed) examples from the `READ.rst`. The examples themselves have been fixed/corrected to run but they haven't yet been updated in the actual docs. Once #99 lands these example scripts will be directly included in our documentation so there will be no possibility of presenting incorrect examples to our users! This technically fixes #108 even though the new example aren't going to be included directly in our docs until #99 lands. --- examples/actor_spawning_and_causality.py | 22 +++++ examples/asynchronous_generators.py | 36 ++++++++ examples/full_fledged_streaming_service.py | 96 ++++++++++++++++++++++ examples/service_discovery.py | 20 +++++ tests/test_docs_examples.py | 18 ++-- 5 files changed, 184 insertions(+), 8 deletions(-) create mode 100644 examples/actor_spawning_and_causality.py create mode 100644 examples/asynchronous_generators.py create mode 100644 examples/full_fledged_streaming_service.py create mode 100644 examples/service_discovery.py diff --git a/examples/actor_spawning_and_causality.py b/examples/actor_spawning_and_causality.py new file mode 100644 index 0000000..fc0d7c7 --- /dev/null +++ b/examples/actor_spawning_and_causality.py @@ -0,0 +1,22 @@ +import tractor + + +def cellar_door(): + return "Dang that's beautiful" + + +async def main(): + """The main ``tractor`` routine. + """ + async with tractor.open_nursery() as n: + + portal = await n.run_in_actor('some_linguist', cellar_door) + + # The ``async with`` will unblock here since the 'some_linguist' + # actor has completed its main task ``cellar_door``. + + print(await portal.result()) + + +if __name__ == '__main__': + tractor.run(main) diff --git a/examples/asynchronous_generators.py b/examples/asynchronous_generators.py new file mode 100644 index 0000000..7ca0beb --- /dev/null +++ b/examples/asynchronous_generators.py @@ -0,0 +1,36 @@ +from itertools import repeat +import trio +import tractor + +tractor.log.get_console_log("INFO") + + +async def stream_forever(): + for i in repeat("I can see these little future bubble things"): + # each yielded value is sent over the ``Channel`` to the + # parent actor + yield i + await trio.sleep(0.01) + + +async def main(): + # stream for at most 1 seconds + with trio.move_on_after(1) as cancel_scope: + async with tractor.open_nursery() as n: + portal = await n.start_actor( + f'donny', + rpc_module_paths=[__name__], + ) + + # this async for loop streams values from the above + # async generator running in a separate process + async for letter in await portal.run(__name__, 'stream_forever'): + print(letter) + + # we support trio's cancellation system + assert cancel_scope.cancelled_caught + assert n.cancelled + + +if __name__ == '__main__': + tractor.run(main, start_method='forkserver') diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py new file mode 100644 index 0000000..e86c4ed --- /dev/null +++ b/examples/full_fledged_streaming_service.py @@ -0,0 +1,96 @@ +import time +import trio +import tractor + + +# this is the first 2 actors, streamer_1 and streamer_2 +async def stream_data(seed): + for i in range(seed): + yield i + await trio.sleep(0) # trigger scheduler + + +# this is the third actor; the aggregator +async def aggregate(seed): + """Ensure that the two streams we receive match but only stream + a single set of values to the parent. + """ + async with tractor.open_nursery() as nursery: + portals = [] + for i in range(1, 3): + # fork point + portal = await nursery.start_actor( + name=f'streamer_{i}', + rpc_module_paths=[__name__], + ) + + portals.append(portal) + + send_chan, recv_chan = trio.open_memory_channel(500) + + async def push_to_chan(portal, send_chan): + async with send_chan: + async for value in await portal.run( + __name__, 'stream_data', seed=seed + ): + # leverage trio's built-in backpressure + await send_chan.send(value) + + print(f"FINISHED ITERATING {portal.channel.uid}") + + # spawn 2 trio tasks to collect streams and push to a local queue + async with trio.open_nursery() as n: + + for portal in portals: + n.start_soon(push_to_chan, portal, send_chan.clone()) + + # close this local task's reference to send side + await send_chan.aclose() + + unique_vals = set() + async with recv_chan: + async for value in recv_chan: + if value not in unique_vals: + unique_vals.add(value) + # yield upwards to the spawning parent actor + yield value + + assert value in unique_vals + + print("FINISHED ITERATING in aggregator") + + await nursery.cancel() + print("WAITING on `ActorNursery` to finish") + print("AGGREGATOR COMPLETE!") + + +# this is the main actor and *arbiter* +async def main(): + # a nursery which spawns "actors" + async with tractor.open_nursery() as nursery: + + seed = int(1e3) + import time + pre_start = time.time() + + portal = await nursery.run_in_actor( + 'aggregator', + aggregate, + seed=seed, + ) + + start = time.time() + # the portal call returns exactly what you'd expect + # as if the remote "aggregate" function was called locally + result_stream = [] + async for value in await portal.result(): + result_stream.append(value) + + print(f"STREAM TIME = {time.time() - start}") + print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") + assert result_stream == list(range(seed)) + return result_stream + + +if __name__ == '__main__': + final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) diff --git a/examples/service_discovery.py b/examples/service_discovery.py new file mode 100644 index 0000000..9088986 --- /dev/null +++ b/examples/service_discovery.py @@ -0,0 +1,20 @@ +import tractor + +tractor.log.get_console_log("INFO") + +async def main(service_name): + + async with tractor.open_nursery() as an: + await an.start_actor(service_name) + + async with tractor.get_arbiter('127.0.0.1', 1616) as portal: + print(f"Arbiter is listening on {portal.channel}") + + async with tractor.wait_for_actor(service_name) as sockaddr: + print(f"my_service is found at {sockaddr}") + + await an.cancel() + + +if __name__ == '__main__': + tractor.run(main, 'some_actor_name') diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index c118ee1..c6bce2e 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -11,7 +11,6 @@ import shutil import pytest -@pytest.fixture(scope='session') def confdir(): dirname = os.path.dirname dirpath = os.path.abspath( @@ -20,13 +19,12 @@ def confdir(): return dirpath -@pytest.fixture -def examples_dir(confdir): - return os.path.join(confdir, 'examples') +def examples_dir(): + return os.path.join(confdir(), 'examples') @pytest.fixture -def run_example_in_subproc(loglevel, testdir, arb_addr, examples_dir): +def run_example_in_subproc(loglevel, testdir, arb_addr): @contextmanager def run(script_code): @@ -36,7 +34,7 @@ def run_example_in_subproc(loglevel, testdir, arb_addr, examples_dir): # on windows we need to create a special __main__.py which will # be executed with ``python -m __main__.py`` on windows.. shutil.copyfile( - os.path.join(examples_dir, '__main__.py'), + os.path.join(examples_dir(), '__main__.py'), os.path.join(str(testdir), '__main__.py') ) @@ -75,8 +73,12 @@ def run_example_in_subproc(loglevel, testdir, arb_addr, examples_dir): yield run -def test_example(examples_dir, run_example_in_subproc): - ex_file = os.path.join(examples_dir, 'a_trynamic_first_scene.py') +@pytest.mark.parametrize( + 'example_script', + [f for f in os.listdir(examples_dir()) if '__' not in f], +) +def test_example(run_example_in_subproc, example_script): + ex_file = os.path.join(examples_dir(), example_script) with open(ex_file, 'r') as ex: code = ex.read() From f2030a2714e5430e7583eb881aaf7737826e2298 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Feb 2020 14:59:22 -0500 Subject: [PATCH 8/8] Better document the window's gotcha solution in test code --- examples/__main__.py | 14 +++++++++++--- tests/test_docs_examples.py | 22 +++++++++++++++++----- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/examples/__main__.py b/examples/__main__.py index 7cb2506..c3f7add 100644 --- a/examples/__main__.py +++ b/examples/__main__.py @@ -1,11 +1,19 @@ """ -Needed in windows. -This needs to be the main program as it will be -called '__mp_main__' by the multiprocessing module +Needed on Windows. + +This module is needed as the program entry point for invocation +with ``python -m ``. See the solution from @chrizzFTD +here: + + https://github.com/goodboy/tractor/pull/61#issuecomment-470053512 """ if __name__ == '__main__': import multiprocessing multiprocessing.freeze_support() + # ``tests/test_docs_examples.py::test_example`` will copy each + # script from this examples directory into a module in a new + # temporary dir and name it test_example.py. We import that script + # module here and invoke it's ``main()``. from . import test_example test_example.tractor.run(test_example.main, start_method='spawn') diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index c6bce2e..8776372 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -11,7 +11,9 @@ import shutil import pytest -def confdir(): +def repodir(): + """Return the abspath to the repo directory. + """ dirname = os.path.dirname dirpath = os.path.abspath( dirname(dirname(os.path.realpath(__file__))) @@ -20,7 +22,9 @@ def confdir(): def examples_dir(): - return os.path.join(confdir(), 'examples') + """Return the abspath to the examples directory. + """ + return os.path.join(repodir(), 'examples') @pytest.fixture @@ -32,20 +36,21 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): if platform.system() == 'Windows': # on windows we need to create a special __main__.py which will - # be executed with ``python -m __main__.py`` on windows.. + # be executed with ``python -m `` on windows.. shutil.copyfile( os.path.join(examples_dir(), '__main__.py'), os.path.join(str(testdir), '__main__.py') ) - # drop the ``if __name__ == '__main__'`` guard + # drop the ``if __name__ == '__main__'`` guard from the *NIX + # version of each script script_code = '\n'.join(script_code.splitlines()[:-4]) script_file = testdir.makefile('.py', script_code) # without this, tests hang on windows forever kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP - # run the "libary module" as a script + # run the testdir "libary module" as a script cmdargs = [ sys.executable, '-m', @@ -78,6 +83,13 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): [f for f in os.listdir(examples_dir()) if '__' not in f], ) def test_example(run_example_in_subproc, example_script): + """Load and run scripts from this repo's ``examples/`` dir as a user + would copy and pasing them into their editor. + + On windows a little more "finessing" is done to make ``multiprocessing`` play nice: + we copy the ``__main__.py`` into the test directory and invoke the script as a module + with ``python -m test_example``. + """ ex_file = os.path.join(examples_dir(), example_script) with open(ex_file, 'r') as ex: code = ex.read()