diff --git a/README.rst b/README.rst index 2dabf6b..6664828 100644 --- a/README.rst +++ b/README.rst @@ -108,7 +108,6 @@ the hip new film we're shooting: .. code:: python import tractor - from functools import partial _this_module = __name__ the_line = 'Hi my name is {}' diff --git a/examples/__main__.py b/examples/__main__.py new file mode 100644 index 0000000..c3f7add --- /dev/null +++ b/examples/__main__.py @@ -0,0 +1,19 @@ +""" +Needed on Windows. + +This module is needed as the program entry point for invocation +with ``python -m ``. See the solution from @chrizzFTD +here: + + https://github.com/goodboy/tractor/pull/61#issuecomment-470053512 + +""" +if __name__ == '__main__': + import multiprocessing + multiprocessing.freeze_support() + # ``tests/test_docs_examples.py::test_example`` will copy each + # script from this examples directory into a module in a new + # temporary dir and name it test_example.py. We import that script + # module here and invoke it's ``main()``. + from . import test_example + test_example.tractor.run(test_example.main, start_method='spawn') diff --git a/examples/a_trynamic_first_scene.py b/examples/a_trynamic_first_scene.py new file mode 100644 index 0000000..8197dc8 --- /dev/null +++ b/examples/a_trynamic_first_scene.py @@ -0,0 +1,44 @@ +import platform +import tractor + +_this_module = __name__ +the_line = 'Hi my name is {}' + + +tractor.log.get_console_log("INFO") + + +async def hi(): + return the_line.format(tractor.current_actor().name) + + +async def say_hello(other_actor): + async with tractor.wait_for_actor(other_actor) as portal: + return await portal.run(_this_module, 'hi') + + +async def main(): + """Main tractor entry point, the "master" process (for now + acts as the "director"). + """ + async with tractor.open_nursery() as n: + print("Alright... Action!") + + donny = await n.run_in_actor( + 'donny', + say_hello, + # arguments are always named + other_actor='gretchen', + ) + gretchen = await n.run_in_actor( + 'gretchen', + say_hello, + other_actor='donny', + ) + print(await gretchen.result()) + print(await donny.result()) + print("CUTTTT CUUTT CUT!!! Donny!! You're supposed to say...") + + +if __name__ == '__main__': + tractor.run(main) diff --git a/examples/actor_spawning_and_causality.py b/examples/actor_spawning_and_causality.py new file mode 100644 index 0000000..fc0d7c7 --- /dev/null +++ b/examples/actor_spawning_and_causality.py @@ -0,0 +1,22 @@ +import tractor + + +def cellar_door(): + return "Dang that's beautiful" + + +async def main(): + """The main ``tractor`` routine. + """ + async with tractor.open_nursery() as n: + + portal = await n.run_in_actor('some_linguist', cellar_door) + + # The ``async with`` will unblock here since the 'some_linguist' + # actor has completed its main task ``cellar_door``. + + print(await portal.result()) + + +if __name__ == '__main__': + tractor.run(main) diff --git a/examples/asynchronous_generators.py b/examples/asynchronous_generators.py new file mode 100644 index 0000000..7ca0beb --- /dev/null +++ b/examples/asynchronous_generators.py @@ -0,0 +1,36 @@ +from itertools import repeat +import trio +import tractor + +tractor.log.get_console_log("INFO") + + +async def stream_forever(): + for i in repeat("I can see these little future bubble things"): + # each yielded value is sent over the ``Channel`` to the + # parent actor + yield i + await trio.sleep(0.01) + + +async def main(): + # stream for at most 1 seconds + with trio.move_on_after(1) as cancel_scope: + async with tractor.open_nursery() as n: + portal = await n.start_actor( + f'donny', + rpc_module_paths=[__name__], + ) + + # this async for loop streams values from the above + # async generator running in a separate process + async for letter in await portal.run(__name__, 'stream_forever'): + print(letter) + + # we support trio's cancellation system + assert cancel_scope.cancelled_caught + assert n.cancelled + + +if __name__ == '__main__': + tractor.run(main, start_method='forkserver') diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py new file mode 100644 index 0000000..e86c4ed --- /dev/null +++ b/examples/full_fledged_streaming_service.py @@ -0,0 +1,96 @@ +import time +import trio +import tractor + + +# this is the first 2 actors, streamer_1 and streamer_2 +async def stream_data(seed): + for i in range(seed): + yield i + await trio.sleep(0) # trigger scheduler + + +# this is the third actor; the aggregator +async def aggregate(seed): + """Ensure that the two streams we receive match but only stream + a single set of values to the parent. + """ + async with tractor.open_nursery() as nursery: + portals = [] + for i in range(1, 3): + # fork point + portal = await nursery.start_actor( + name=f'streamer_{i}', + rpc_module_paths=[__name__], + ) + + portals.append(portal) + + send_chan, recv_chan = trio.open_memory_channel(500) + + async def push_to_chan(portal, send_chan): + async with send_chan: + async for value in await portal.run( + __name__, 'stream_data', seed=seed + ): + # leverage trio's built-in backpressure + await send_chan.send(value) + + print(f"FINISHED ITERATING {portal.channel.uid}") + + # spawn 2 trio tasks to collect streams and push to a local queue + async with trio.open_nursery() as n: + + for portal in portals: + n.start_soon(push_to_chan, portal, send_chan.clone()) + + # close this local task's reference to send side + await send_chan.aclose() + + unique_vals = set() + async with recv_chan: + async for value in recv_chan: + if value not in unique_vals: + unique_vals.add(value) + # yield upwards to the spawning parent actor + yield value + + assert value in unique_vals + + print("FINISHED ITERATING in aggregator") + + await nursery.cancel() + print("WAITING on `ActorNursery` to finish") + print("AGGREGATOR COMPLETE!") + + +# this is the main actor and *arbiter* +async def main(): + # a nursery which spawns "actors" + async with tractor.open_nursery() as nursery: + + seed = int(1e3) + import time + pre_start = time.time() + + portal = await nursery.run_in_actor( + 'aggregator', + aggregate, + seed=seed, + ) + + start = time.time() + # the portal call returns exactly what you'd expect + # as if the remote "aggregate" function was called locally + result_stream = [] + async for value in await portal.result(): + result_stream.append(value) + + print(f"STREAM TIME = {time.time() - start}") + print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") + assert result_stream == list(range(seed)) + return result_stream + + +if __name__ == '__main__': + final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) diff --git a/examples/service_discovery.py b/examples/service_discovery.py new file mode 100644 index 0000000..9088986 --- /dev/null +++ b/examples/service_discovery.py @@ -0,0 +1,20 @@ +import tractor + +tractor.log.get_console_log("INFO") + +async def main(service_name): + + async with tractor.open_nursery() as an: + await an.start_actor(service_name) + + async with tractor.get_arbiter('127.0.0.1', 1616) as portal: + print(f"Arbiter is listening on {portal.channel}") + + async with tractor.wait_for_actor(service_name) as sockaddr: + print(f"my_service is found at {sockaddr}") + + await an.cancel() + + +if __name__ == '__main__': + tractor.run(main, 'some_actor_name') diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py new file mode 100644 index 0000000..8776372 --- /dev/null +++ b/tests/test_docs_examples.py @@ -0,0 +1,105 @@ +""" +Let's make sure them docs work yah? +""" +from contextlib import contextmanager +import os +import sys +import subprocess +import platform +import shutil + +import pytest + + +def repodir(): + """Return the abspath to the repo directory. + """ + dirname = os.path.dirname + dirpath = os.path.abspath( + dirname(dirname(os.path.realpath(__file__))) + ) + return dirpath + + +def examples_dir(): + """Return the abspath to the examples directory. + """ + return os.path.join(repodir(), 'examples') + + +@pytest.fixture +def run_example_in_subproc(loglevel, testdir, arb_addr): + + @contextmanager + def run(script_code): + kwargs = dict() + + if platform.system() == 'Windows': + # on windows we need to create a special __main__.py which will + # be executed with ``python -m `` on windows.. + shutil.copyfile( + os.path.join(examples_dir(), '__main__.py'), + os.path.join(str(testdir), '__main__.py') + ) + + # drop the ``if __name__ == '__main__'`` guard from the *NIX + # version of each script + script_code = '\n'.join(script_code.splitlines()[:-4]) + script_file = testdir.makefile('.py', script_code) + + # without this, tests hang on windows forever + kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP + + # run the testdir "libary module" as a script + cmdargs = [ + sys.executable, + '-m', + # use the "module name" of this "package" + 'test_example' + ] + else: + script_file = testdir.makefile('.py', script_code) + cmdargs = [ + sys.executable, + str(script_file), + ] + + proc = testdir.popen( + cmdargs, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **kwargs, + ) + assert not proc.returncode + yield proc + proc.wait() + assert proc.returncode == 0 + + yield run + + +@pytest.mark.parametrize( + 'example_script', + [f for f in os.listdir(examples_dir()) if '__' not in f], +) +def test_example(run_example_in_subproc, example_script): + """Load and run scripts from this repo's ``examples/`` dir as a user + would copy and pasing them into their editor. + + On windows a little more "finessing" is done to make ``multiprocessing`` play nice: + we copy the ``__main__.py`` into the test directory and invoke the script as a module + with ``python -m test_example``. + """ + ex_file = os.path.join(examples_dir(), example_script) + with open(ex_file, 'r') as ex: + code = ex.read() + + with run_example_in_subproc(code) as proc: + proc.wait() + err, _ = proc.stderr.read(), proc.stdout.read() + + # if we get some gnarly output let's aggregate and raise + if err and b'Error' in err: + raise Exception(err.decode()) + + assert proc.returncode == 0 diff --git a/tractor/_actor.py b/tractor/_actor.py index 7118a3e..443c48c 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -270,7 +270,10 @@ class Actor: # should allow for relative (at least downward) imports. sys.path.append(os.path.dirname(filepath)) log.debug(f"Attempting to import {modpath}@{filepath}") - self._mods[modpath] = importlib.import_module(modpath) + mod = importlib.import_module(modpath) + self._mods[modpath] = mod + if modpath == '__main__': + self._mods['__mp_main__'] = mod except ModuleNotFoundError: # it is expected the corresponding `ModuleNotExposed` error # will be raised later @@ -278,11 +281,6 @@ class Actor: raise def _get_rpc_func(self, ns, funcname): - if ns == "__mp_main__": - # In subprocesses, `__main__` will actually map to - # `__mp_main__` which should be the same entry-point-module - # as the parent. - ns = "__main__" try: return getattr(self._mods[ns], funcname) except KeyError as err: diff --git a/tractor/_discovery.py b/tractor/_discovery.py index ae2e3be..3667c90 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -5,7 +5,7 @@ import typing from typing import Tuple, Optional, Union from async_generator import asynccontextmanager -from ._ipc import _connect_chan +from ._ipc import _connect_chan, Channel from ._portal import ( Portal, open_portal, @@ -16,7 +16,8 @@ from ._state import current_actor @asynccontextmanager async def get_arbiter( - host: str, port: int + host: str, + port: int, ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: """Return a portal instance connected to a local or remote arbiter. @@ -28,7 +29,7 @@ async def get_arbiter( if actor.is_arbiter: # we're already the arbiter # (likely a re-entrant call from the arbiter actor) - yield LocalPortal(actor) + yield LocalPortal(actor, Channel((host, port))) else: async with _connect_chan(host, port) as chan: async with open_portal(chan) as arb_portal: diff --git a/tractor/_portal.py b/tractor/_portal.py index 8f5899b..89cab39 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -298,6 +298,7 @@ class LocalPortal: using an in process actor instance. """ actor: 'Actor' # type: ignore + channel: Channel async def run(self, ns: str, func_name: str, **kwargs) -> Any: """Run a requested function locally and return it's result. diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 9421d21..5700131 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -30,20 +30,21 @@ from ._actor import Actor, ActorFailure log = get_logger('tractor') -# use trip as our default on *nix systems for now -if platform.system() != 'Windows': - _spawn_method: str = "trio_run_in_process" -else: - _spawn_method = "spawn" - +# placeholder for an mp start context if so using that backend _ctx: Optional[mp.context.BaseContext] = None +_spawn_method: str = "spawn" if platform.system() == 'Windows': + _spawn_method = "spawn" + _ctx = mp.get_context("spawn") + async def proc_waiter(proc: mp.Process) -> None: await trio.hazmat.WaitForSingleObject(proc.sentinel) else: + # *NIX systems use ``trio_run_in_process` as our default (for now) import trio_run_in_process + _spawn_method = "trio_run_in_process" async def proc_waiter(proc: mp.Process) -> None: await trio.hazmat.wait_readable(proc.sentinel) @@ -53,9 +54,9 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: """Attempt to set the start method for process starting, aka the "actor spawning backend". - If the desired method is not supported this function will error. On Windows - the only supported option is the ``multiprocessing`` "spawn" method. The default - on *nix systems is ``trio_run_in_process``. + If the desired method is not supported this function will error. On + Windows the only supported option is the ``multiprocessing`` "spawn" + method. The default on *nix systems is ``trio_run_in_process``. """ global _ctx global _spawn_method