From 1f3de884223b8bcc7e1c8219a4f510ca9cb2ec84 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 27 Jul 2020 11:03:17 -0400 Subject: [PATCH] Support asyncio actors with the trio spawner backend --- tractor/_child.py | 8 ++++++-- tractor/_entry.py | 10 +++++++++- tractor/_spawn.py | 8 ++++++++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/tractor/_child.py b/tractor/_child.py index f145824..df50e38 100644 --- a/tractor/_child.py +++ b/tractor/_child.py @@ -19,12 +19,15 @@ def parse_ipaddr(arg): return (str(host), int(port)) +from ._entry import _trio_main + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--uid", type=parse_uid) parser.add_argument("--loglevel", type=str) parser.add_argument("--parent_addr", type=parse_ipaddr) + parser.add_argument("--asyncio", action='store_true') args = parser.parse_args() subactor = Actor( @@ -36,5 +39,6 @@ if __name__ == "__main__": _trio_main( subactor, - parent_addr=args.parent_addr - ) \ No newline at end of file + parent_addr=args.parent_addr, + infect_asyncio=args.asyncio, + ) diff --git a/tractor/_entry.py b/tractor/_entry.py index 521d148..79b1437 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -20,6 +20,7 @@ def _mp_main( forkserver_info: Tuple[Any, Any, Any, Any, Any], start_method: str, parent_addr: Tuple[str, int] = None, + infect_asyncio: bool = False, ) -> None: """The routine called *after fork* which invokes a fresh ``trio.run`` """ @@ -61,12 +62,15 @@ def _trio_main( actor: 'Actor', # type: ignore # noqa *, parent_addr: Tuple[str, int] = None, + infect_asyncio: bool = False, ) -> None: """Entry point for a `trio_run_in_process` subactor. """ log.info(f"Started new trio process for {actor.uid}") + log.info(f"Started new trio process for {actor.uid}") + if actor.loglevel is not None: log.info( f"Setting loglevel for {actor.uid} to {actor.loglevel}") @@ -84,7 +88,11 @@ def _trio_main( ) try: - trio.run(trio_main) + if infect_asyncio: + actor._infected_aio = True + run_as_asyncio_guest(trio_main) + else: + trio.run(trio_main) except KeyboardInterrupt: log.warning(f"Actor {actor.uid} received KBI") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index a4feb54..0fc9dbb 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -159,6 +159,7 @@ async def cancel_on_completion( async def spawn_subactor( subactor: 'Actor', parent_addr: Tuple[str, int], + infect_asyncio: bool, ): spawn_cmd = [ sys.executable, @@ -183,6 +184,10 @@ async def spawn_subactor( subactor.loglevel ] + # Tell child to run in guest mode on top of ``asyncio`` loop + if infect_asyncio: + spawn_cmd.append("--asyncio") + proc = await trio.open_process(spawn_cmd) try: yield proc @@ -240,6 +245,7 @@ async def new_proc( parent_addr: Tuple[str, int], _runtime_vars: Dict[str, Any], # serialized and sent to _child *, + infect_asyncio: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: """Create a new ``multiprocessing.Process`` using the @@ -255,6 +261,7 @@ async def new_proc( async with spawn_subactor( subactor, parent_addr, + infect_asyncio=infect_asyncio ) as proc: log.info(f"Started {proc}") @@ -392,6 +399,7 @@ async def mp_new_proc( fs_info, start_method, parent_addr, + infect_asyncio, ), # daemon=True, name=name,