forked from goodboy/tractor
1
0
Fork 0

Support asyncio actors with the trio spawner backend

msgspec_infect_asyncio
Tyler Goodlet 2020-07-27 11:03:17 -04:00
parent 86089800ab
commit 25c19b9274
3 changed files with 23 additions and 3 deletions

View File

@ -19,12 +19,15 @@ def parse_ipaddr(arg):
return (str(host), int(port)) return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid) parser.add_argument("--uid", type=parse_uid)
parser.add_argument("--loglevel", type=str) parser.add_argument("--loglevel", type=str)
parser.add_argument("--parent_addr", type=parse_ipaddr) parser.add_argument("--parent_addr", type=parse_ipaddr)
parser.add_argument("--asyncio", action='store_true')
args = parser.parse_args() args = parser.parse_args()
subactor = Actor( subactor = Actor(
@ -36,5 +39,6 @@ if __name__ == "__main__":
_trio_main( _trio_main(
subactor, subactor,
parent_addr=args.parent_addr parent_addr=args.parent_addr,
) infect_asyncio=args.asyncio,
)

View File

@ -21,6 +21,7 @@ def _mp_main(
forkserver_info: Tuple[Any, Any, Any, Any, Any], forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str, start_method: str,
parent_addr: Tuple[str, int] = None, parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None: ) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run`` """The routine called *after fork* which invokes a fresh ``trio.run``
""" """
@ -62,6 +63,7 @@ def _trio_main(
actor: 'Actor', # type: ignore actor: 'Actor', # type: ignore
*, *,
parent_addr: Tuple[str, int] = None, parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None: ) -> None:
"""Entry point for a `trio_run_in_process` subactor. """Entry point for a `trio_run_in_process` subactor.
""" """
@ -71,6 +73,8 @@ def _trio_main(
log.info(f"Started new trio process for {actor.uid}") log.info(f"Started new trio process for {actor.uid}")
log.info(f"Started new trio process for {actor.uid}")
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f"Setting loglevel for {actor.uid} to {actor.loglevel}")
@ -88,7 +92,11 @@ def _trio_main(
) )
try: try:
trio.run(trio_main) if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
log.warning(f"Actor {actor.uid} received KBI") log.warning(f"Actor {actor.uid} received KBI")

View File

@ -179,6 +179,7 @@ async def do_hard_kill(
async def spawn_subactor( async def spawn_subactor(
subactor: 'Actor', subactor: 'Actor',
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
infect_asyncio: bool,
): ):
spawn_cmd = [ spawn_cmd = [
sys.executable, sys.executable,
@ -203,6 +204,10 @@ async def spawn_subactor(
subactor.loglevel subactor.loglevel
] ]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
proc = await trio.open_process(spawn_cmd) proc = await trio.open_process(spawn_cmd)
try: try:
yield proc yield proc
@ -227,6 +232,7 @@ async def new_proc(
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
"""Create a new ``multiprocessing.Process`` using the """Create a new ``multiprocessing.Process`` using the
@ -242,6 +248,7 @@ async def new_proc(
async with spawn_subactor( async with spawn_subactor(
subactor, subactor,
parent_addr, parent_addr,
infect_asyncio=infect_asyncio
) as proc: ) as proc:
log.runtime(f"Started {proc}") log.runtime(f"Started {proc}")
@ -384,6 +391,7 @@ async def mp_new_proc(
fs_info, fs_info,
start_method, start_method,
parent_addr, parent_addr,
infect_asyncio,
), ),
# daemon=True, # daemon=True,
name=name, name=name,