Support asyncio actors with the trio spawner backend

infect_asyncio
Tyler Goodlet 2020-07-27 11:03:17 -04:00
parent 1406ddc5ee
commit 8070b16bd0
3 changed files with 19 additions and 4 deletions

View File

@ -37,12 +37,15 @@ def parse_ipaddr(arg):
return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid)
parser.add_argument("--loglevel", type=str)
parser.add_argument("--parent_addr", type=parse_ipaddr)
parser.add_argument("--asyncio", action='store_true')
args = parser.parse_args()
subactor = Actor(
@ -54,5 +57,6 @@ if __name__ == "__main__":
_trio_main(
subactor,
parent_addr=args.parent_addr
)
parent_addr=args.parent_addr,
infect_asyncio=args.asyncio,
)

View File

@ -38,6 +38,7 @@ def _mp_main(
forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str,
parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run``
"""
@ -79,6 +80,7 @@ def _trio_main(
actor: 'Actor', # type: ignore
*,
parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
"""Entry point for a `trio_run_in_process` subactor.
"""
@ -88,6 +90,8 @@ def _trio_main(
log.info(f"Started new trio process for {actor.uid}")
log.info(f"Started new trio process for {actor.uid}")
if actor.loglevel is not None:
log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
@ -105,7 +109,11 @@ def _trio_main(
)
try:
trio.run(trio_main)
if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt:
log.warning(f"Actor {actor.uid} received KBI")

View File

@ -244,6 +244,7 @@ async def new_proc(
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
@ -260,7 +261,6 @@ async def new_proc(
uid = subactor.uid
if _spawn_method == 'trio':
spawn_cmd = [
sys.executable,
"-m",
@ -283,6 +283,9 @@ async def new_proc(
"--loglevel",
subactor.loglevel
]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
cancelled_during_spawn: bool = False
proc: Optional[trio.Process] = None