forked from goodboy/tractor
Support asyncio actors with the trio spawner backend
parent
fa455f9c24
commit
47074209a1
|
@ -19,12 +19,15 @@ def parse_ipaddr(arg):
|
||||||
return (str(host), int(port))
|
return (str(host), int(port))
|
||||||
|
|
||||||
|
|
||||||
|
from ._entry import _trio_main
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument("--uid", type=parse_uid)
|
parser.add_argument("--uid", type=parse_uid)
|
||||||
parser.add_argument("--loglevel", type=str)
|
parser.add_argument("--loglevel", type=str)
|
||||||
parser.add_argument("--parent_addr", type=parse_ipaddr)
|
parser.add_argument("--parent_addr", type=parse_ipaddr)
|
||||||
|
parser.add_argument("--asyncio", action='store_true')
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
subactor = Actor(
|
subactor = Actor(
|
||||||
|
@ -36,5 +39,6 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
_trio_main(
|
_trio_main(
|
||||||
subactor,
|
subactor,
|
||||||
parent_addr=args.parent_addr
|
parent_addr=args.parent_addr,
|
||||||
|
infect_asyncio=args.asyncio,
|
||||||
)
|
)
|
|
@ -22,6 +22,7 @@ def _mp_main(
|
||||||
forkserver_info: Tuple[Any, Any, Any, Any, Any],
|
forkserver_info: Tuple[Any, Any, Any, Any, Any],
|
||||||
start_method: str,
|
start_method: str,
|
||||||
parent_addr: Tuple[str, int] = None,
|
parent_addr: Tuple[str, int] = None,
|
||||||
|
infect_asyncio: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""The routine called *after fork* which invokes a fresh ``trio.run``
|
"""The routine called *after fork* which invokes a fresh ``trio.run``
|
||||||
"""
|
"""
|
||||||
|
@ -59,7 +60,9 @@ def _mp_main(
|
||||||
|
|
||||||
def _trio_main(
|
def _trio_main(
|
||||||
actor: 'Actor',
|
actor: 'Actor',
|
||||||
parent_addr: Tuple[str, int] = None
|
*,
|
||||||
|
parent_addr: Tuple[str, int] = None,
|
||||||
|
infect_asyncio: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Entry point for a `trio_run_in_process` subactor.
|
"""Entry point for a `trio_run_in_process` subactor.
|
||||||
"""
|
"""
|
||||||
|
@ -70,6 +73,8 @@ def _trio_main(
|
||||||
# TODO: make a global func to set this or is it too hacky?
|
# TODO: make a global func to set this or is it too hacky?
|
||||||
# os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint'
|
# os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint'
|
||||||
|
|
||||||
|
log.info(f"Started new trio process for {actor.uid}")
|
||||||
|
|
||||||
if actor.loglevel is not None:
|
if actor.loglevel is not None:
|
||||||
log.info(
|
log.info(
|
||||||
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
|
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
|
||||||
|
@ -87,7 +92,11 @@ def _trio_main(
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
trio.run(trio_main)
|
if infect_asyncio:
|
||||||
|
actor._infected_aio = True
|
||||||
|
run_as_asyncio_guest(trio_main)
|
||||||
|
else:
|
||||||
|
trio.run(trio_main)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
log.warning(f"Actor {actor.uid} received KBI")
|
log.warning(f"Actor {actor.uid} received KBI")
|
||||||
|
|
||||||
|
|
|
@ -157,6 +157,7 @@ async def cancel_on_completion(
|
||||||
async def spawn_subactor(
|
async def spawn_subactor(
|
||||||
subactor: 'Actor',
|
subactor: 'Actor',
|
||||||
parent_addr: Tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
|
infect_asyncio: bool,
|
||||||
):
|
):
|
||||||
spawn_cmd = [
|
spawn_cmd = [
|
||||||
sys.executable,
|
sys.executable,
|
||||||
|
@ -181,6 +182,10 @@ async def spawn_subactor(
|
||||||
subactor.loglevel
|
subactor.loglevel
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Tell child to run in guest mode on top of ``asyncio`` loop
|
||||||
|
if infect_asyncio:
|
||||||
|
spawn_cmd.append("--asyncio")
|
||||||
|
|
||||||
proc = await trio.open_process(spawn_cmd)
|
proc = await trio.open_process(spawn_cmd)
|
||||||
try:
|
try:
|
||||||
yield proc
|
yield proc
|
||||||
|
@ -217,6 +222,7 @@ async def new_proc(
|
||||||
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||||
*,
|
*,
|
||||||
use_trio_run_in_process: bool = False,
|
use_trio_run_in_process: bool = False,
|
||||||
|
infect_asyncio: bool = False,
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Create a new ``multiprocessing.Process`` using the
|
"""Create a new ``multiprocessing.Process`` using the
|
||||||
|
@ -232,6 +238,7 @@ async def new_proc(
|
||||||
async with spawn_subactor(
|
async with spawn_subactor(
|
||||||
subactor,
|
subactor,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
|
infect_asyncio=infect_asyncio
|
||||||
) as proc:
|
) as proc:
|
||||||
log.info(f"Started {proc}")
|
log.info(f"Started {proc}")
|
||||||
|
|
||||||
|
@ -321,6 +328,7 @@ async def new_proc(
|
||||||
fs_info,
|
fs_info,
|
||||||
start_method,
|
start_method,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
|
infect_asyncio,
|
||||||
),
|
),
|
||||||
# daemon=True,
|
# daemon=True,
|
||||||
name=name,
|
name=name,
|
||||||
|
|
Loading…
Reference in New Issue