diff --git a/tests/_test.py b/tests/_test.py new file mode 100644 index 0000000..e7ac0b1 --- /dev/null +++ b/tests/_test.py @@ -0,0 +1,23 @@ +import tractor + +from tractor.log import get_console_log + +log = get_console_log('trace') + +def cellar_door(): + return "Dang that's beautiful" + + +async def test_most_beautiful_word(): + """The main ``tractor`` routine. + """ + async with tractor.open_nursery() as n: + + portal = await n.run_in_actor('some_linguist', cellar_door) + + # The ``async with`` will unblock here since the 'some_linguist' + # actor has completed its main task ``cellar_door``. + + print(await portal.result()) + +tractor.run(test_most_beautiful_word) \ No newline at end of file diff --git a/tractor/_actor.py b/tractor/_actor.py index 81f05cd..4f7a15c 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -186,7 +186,7 @@ class Actor: statespace: Optional[Dict[str, Any]] = None, uid: str = None, loglevel: str = None, - arbiter_addr: Optional[Tuple[str, int]] = None, + arbiter_addr: Optional[Tuple[str, int]] = None ) -> None: """This constructor is called in the parent actor **before** the spawning phase (aka before a new process is executed). @@ -203,6 +203,8 @@ class Actor: mod = importlib.import_module(name) mods[name] = _get_mod_abspath(mod) + log.debug(f"{name} Loaded RPC modules: {mods}") + self.rpc_module_paths = mods self._mods: Dict[str, ModuleType] = {} @@ -594,7 +596,7 @@ class Actor: self.load_modules() # register with the arbiter if we're told its addr - log.debug(f"Registering {self} for role `{self.name}`") + log.debug(f"Registering {self} for role `{self.name}` @ {arbiter_addr}") assert isinstance(arbiter_addr, tuple) async with get_arbiter(*arbiter_addr) as arb_portal: await arb_portal.run( diff --git a/tractor/_child.py b/tractor/_child.py index b4d1d60..2cd961f 100644 --- a/tractor/_child.py +++ b/tractor/_child.py @@ -1,6 +1,59 @@ + import sys import trio -import cloudpickle +import argparse + +from ._actor import Actor +from ._entry import _trio_main + + +"""This is the "bootloader" for actors started using the native trio backend +added in #128 +""" + if __name__ == "__main__": - trio.run(cloudpickle.load(sys.stdin.buffer)) + + parser = argparse.ArgumentParser() + + parser.add_argument("name") + parser.add_argument("rpc_module_paths") # comma separated mod paths + parser.add_argument("uid") + parser.add_argument("loglevel") + parser.add_argument("bind_addr") + parser.add_argument("parent_addr") + parser.add_argument("arbiter_addr") + + args = parser.parse_args() + + rpc_paths = [] + for rpc_mod_path in args.rpc_module_paths.split(";"): + rpc_paths.append(rpc_mod_path) + + bind_addr = args.bind_addr.split(":") + bind_addr = (bind_addr[0], int(bind_addr[1])) + + parent_addr = args.parent_addr.split(":") + parent_addr = (parent_addr[0], int(parent_addr[1])) + + arbiter_addr = args.arbiter_addr.split(":") + arbiter_addr = (arbiter_addr[0], int(arbiter_addr[1])) + + if args.loglevel == "None": + loglevel = None + else: + loglevel = args.loglevel + + subactor = Actor( + args.name, + rpc_module_paths=rpc_paths, + uid=args.uid, + loglevel=loglevel, + arbiter_addr=arbiter_addr + ) + + _trio_main( + subactor, + bind_addr, + parent_addr=parent_addr + ) diff --git a/tractor/_entry.py b/tractor/_entry.py index 82f814c..4f80a3f 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -52,15 +52,12 @@ def _mp_main( log.info(f"Actor {actor.uid} terminated") -async def _trio_main( +def _trio_main( actor: 'Actor', accept_addr: Tuple[str, int], parent_addr: Tuple[str, int] = None ) -> None: - """Entry point for a `trio_run_in_process` subactor. - - Here we don't need to call `trio.run()` since trip does that as - part of its subprocess startup sequence. + """Entry point for a `trio` based subactor. """ if actor.loglevel is not None: log.info( @@ -71,5 +68,14 @@ async def _trio_main( _state._current_actor = actor - await actor._async_main(accept_addr, parent_addr=parent_addr) + trio_main = partial( + actor._async_main, + accept_addr, + parent_addr=parent_addr + ) + + try: + trio.run(trio_main) + except KeyboardInterrupt: + pass log.info(f"Actor {actor.uid} terminated") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 5642083..3fbd0c6 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -7,10 +7,8 @@ import subprocess import multiprocessing as mp import platform from typing import Any, Dict, Optional -from functools import partial import trio -import cloudpickle from trio_typing import TaskStatus from async_generator import aclosing, asynccontextmanager @@ -25,7 +23,7 @@ except ImportError: from multiprocessing import forkserver # type: ignore from typing import Tuple -from . import _forkserver_override +from . import _forkserver_override, _child from ._state import current_actor from .log import get_logger from ._portal import Portal @@ -158,26 +156,32 @@ async def cancel_on_completion( @asynccontextmanager -async def run_in_process(subactor, async_fn, *args, **kwargs): - encoded_job = cloudpickle.dumps(partial(async_fn, *args, **kwargs)) +async def spawn_subactor( + subactor: Actor, + bind_addr: Tuple[str, int], + parent_addr: Tuple[str, int] +): + + spawn_cmd = [ + sys.executable, + "-m", + _child.__name__, + subactor.name, + ";".join( + [mod_path for mod_path in subactor.rpc_module_paths] + ), + subactor.uid[1], + subactor.loglevel or "None", + f"{bind_addr[0]}:{bind_addr[1]}", + f"{parent_addr[0]}:{parent_addr[1]}", + f"{subactor._arb_addr[0]}:{subactor._arb_addr[1]}", + ] - async with await trio.open_process( - [ - sys.executable, - "-m", - # Hardcode this (instead of using ``_child.__name__`` to avoid a - # double import warning: https://stackoverflow.com/a/45070583 - "tractor._child", - # This is merely an identifier for debugging purposes when - # viewing the process tree from the OS - str(subactor.uid), - ], - stdin=subprocess.PIPE, - ) as proc: + log.info(f"Spawn actor with cmd: {spawn_cmd}") - # send func object to call in child - await proc.stdin.send_all(encoded_job) - yield proc + proc = await trio.open_process(spawn_cmd) + + yield proc async def new_proc( @@ -202,9 +206,7 @@ async def new_proc( async with trio.open_nursery() as nursery: if use_trio_run_in_process or _spawn_method == 'trio': - async with run_in_process( - subactor, - _trio_main, + async with spawn_subactor( subactor, bind_addr, parent_addr,