diff --git a/tractor/_child.py b/tractor/_child.py index f145824..df50e38 100644 --- a/tractor/_child.py +++ b/tractor/_child.py @@ -19,12 +19,15 @@ def parse_ipaddr(arg): return (str(host), int(port)) +from ._entry import _trio_main + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--uid", type=parse_uid) parser.add_argument("--loglevel", type=str) parser.add_argument("--parent_addr", type=parse_ipaddr) + parser.add_argument("--asyncio", action='store_true') args = parser.parse_args() subactor = Actor( @@ -36,5 +39,6 @@ if __name__ == "__main__": _trio_main( subactor, - parent_addr=args.parent_addr - ) \ No newline at end of file + parent_addr=args.parent_addr, + infect_asyncio=args.asyncio, + ) diff --git a/tractor/_entry.py b/tractor/_entry.py index 8c9c1ac..9d53889 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -22,6 +22,7 @@ def _mp_main( forkserver_info: Tuple[Any, Any, Any, Any, Any], start_method: str, parent_addr: Tuple[str, int] = None, + infect_asyncio: bool = False, ) -> None: """The routine called *after fork* which invokes a fresh ``trio.run`` """ @@ -59,7 +60,9 @@ def _mp_main( def _trio_main( actor: 'Actor', - parent_addr: Tuple[str, int] = None + *, + parent_addr: Tuple[str, int] = None, + infect_asyncio: bool = False, ) -> None: """Entry point for a `trio_run_in_process` subactor. """ @@ -70,6 +73,8 @@ def _trio_main( # TODO: make a global func to set this or is it too hacky? # os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint' + log.info(f"Started new trio process for {actor.uid}") + if actor.loglevel is not None: log.info( f"Setting loglevel for {actor.uid} to {actor.loglevel}") @@ -87,7 +92,11 @@ def _trio_main( ) try: - trio.run(trio_main) + if infect_asyncio: + actor._infected_aio = True + run_as_asyncio_guest(trio_main) + else: + trio.run(trio_main) except KeyboardInterrupt: log.warning(f"Actor {actor.uid} received KBI") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 2065967..7e44f59 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -157,6 +157,7 @@ async def cancel_on_completion( async def spawn_subactor( subactor: 'Actor', parent_addr: Tuple[str, int], + infect_asyncio: bool, ): spawn_cmd = [ sys.executable, @@ -181,6 +182,10 @@ async def spawn_subactor( subactor.loglevel ] + # Tell child to run in guest mode on top of ``asyncio`` loop + if infect_asyncio: + spawn_cmd.append("--asyncio") + proc = await trio.open_process(spawn_cmd) try: yield proc @@ -217,6 +222,7 @@ async def new_proc( _runtime_vars: Dict[str, Any], # serialized and sent to _child *, use_trio_run_in_process: bool = False, + infect_asyncio: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: """Create a new ``multiprocessing.Process`` using the @@ -232,6 +238,7 @@ async def new_proc( async with spawn_subactor( subactor, parent_addr, + infect_asyncio=infect_asyncio ) as proc: log.info(f"Started {proc}") @@ -321,6 +328,7 @@ async def new_proc( fs_info, start_method, parent_addr, + infect_asyncio, ), # daemon=True, name=name,