Support "infected asyncio" actors

This is an initial solution for #120.

Allow spawning `asyncio` based actors which run `trio` in guest
mode. This enables spawning `tractor` actors on top of the `asyncio`
event loop whilst still leveraging the SC focused internal actor
supervision machinery. Add a `tractor.to_syncio.run()` api to allow
spawning tasks on the `asyncio` loop from an embedded (remote) `trio`
task and return or stream results all the way back through the `tractor`
IPC system using a very similar api to portals.

One outstanding problem is getting SC around calls to
`asyncio.create_task()`. Currently a task that crashes isn't able to
easily relay the error to the embedded `trio` task without us fully
enforcing the portals based message protocol (which seems superfluous
given the error ref is in process). Further experiments using `anyio`
task groups may alleviate this.
reorg_entry_points
Tyler Goodlet 2020-06-28 13:10:02 -04:00
parent 2b2cf2e001
commit 8054bc7c70
5 changed files with 221 additions and 14 deletions

View File

@ -20,6 +20,7 @@ from ._state import current_actor
from ._exceptions import RemoteActorError, ModuleNotExposed from ._exceptions import RemoteActorError, ModuleNotExposed
from . import msg from . import msg
from . import _spawn from . import _spawn
from . import to_asyncio
__all__ = [ __all__ = [
@ -35,6 +36,7 @@ __all__ = [
'RemoteActorError', 'RemoteActorError',
'ModuleNotExposed', 'ModuleNotExposed',
'msg' 'msg'
'to_asyncio'
] ]

121
tractor/_entry.py 100644
View File

@ -0,0 +1,121 @@
"""
Process entry points.
"""
import asyncio
from functools import partial
from typing import Tuple, Any, Awaitable
import trio # type: ignore
from ._actor import Actor
from .log import get_console_log, get_logger
from . import _state
__all__ = ('run',)
log = get_logger(__name__)
def _asyncio_main(
trio_main: Awaitable,
) -> None:
"""Entry for an "infected ``asyncio`` actor".
Uh, oh. :o
It looks like your event loop has caught a case of the ``trio``s.
:()
Don't worry, we've heard you'll barely notice. You might hallucinate
a few more propagating errors and feel like your digestion has
slowed but if anything get's too bad your parents will know about
it.
:)
"""
async def aio_main(trio_main):
loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
log.info(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
(await trio_done_fut).unwrap()
asyncio.run(aio_main(trio_main))
def _mp_main(
actor: 'Actor',
accept_addr: Tuple[str, int],
forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str,
parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run``
"""
actor._forkserver_info = forkserver_info
from ._spawn import try_set_start_method
spawn_ctx = try_set_start_method(start_method)
if actor.loglevel is not None:
log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
get_console_log(actor.loglevel)
assert spawn_ctx
log.info(
f"Started new {spawn_ctx.current_process()} for {actor.uid}")
_state._current_actor = actor
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
actor._async_main,
accept_addr,
parent_addr=parent_addr
)
try:
if infect_asyncio:
actor._infected_aio = True
_asyncio_main(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt:
pass # handle it the same way trio does?
log.info(f"Actor {actor.uid} terminated")
async def _trip_main(
actor: 'Actor',
accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None
) -> None:
"""Entry point for a `trio_run_in_process` subactor.
Here we don't need to call `trio.run()` since trip does that as
part of its subprocess startup sequence.
"""
if actor.loglevel is not None:
log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
get_console_log(actor.loglevel)
log.info(f"Started new TRIP process for {actor.uid}")
_state._current_actor = actor
await actor._async_main(accept_addr, parent_addr=parent_addr)
log.info(f"Actor {actor.uid} terminated")

View File

@ -26,6 +26,7 @@ from ._state import current_actor
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._actor import Actor, ActorFailure from ._actor import Actor, ActorFailure
from ._entry import _mp_main, _trip_main
log = get_logger('tractor') log = get_logger('tractor')
@ -161,6 +162,7 @@ async def new_proc(
bind_addr: Tuple[str, int], bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
use_trio_run_in_process: bool = False, use_trio_run_in_process: bool = False,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
"""Create a new ``multiprocessing.Process`` using the """Create a new ``multiprocessing.Process`` using the
@ -173,9 +175,12 @@ async def new_proc(
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
if use_trio_run_in_process or _spawn_method == 'trio_run_in_process': if use_trio_run_in_process or _spawn_method == 'trio_run_in_process':
if infect_asyncio:
raise NotImplementedError("Asyncio is incompatible with trip")
# trio_run_in_process # trio_run_in_process
async with trio_run_in_process.open_in_process( async with trio_run_in_process.open_in_process(
subactor._trip_main, _trip_main,
subactor,
bind_addr, bind_addr,
parent_addr, parent_addr,
) as proc: ) as proc:
@ -235,12 +240,14 @@ async def new_proc(
fs_info = (None, None, None, None, None) fs_info = (None, None, None, None, None)
proc = _ctx.Process( # type: ignore proc = _ctx.Process( # type: ignore
target=subactor._mp_main, target=_mp_main,
args=( args=(
subactor,
bind_addr, bind_addr,
fs_info, fs_info,
start_method, start_method,
parent_addr parent_addr,
infect_asyncio,
), ),
# daemon=True, # daemon=True,
name=name, name=name,

View File

@ -1,6 +1,7 @@
""" """
``trio`` inspired apis and helpers ``trio`` inspired apis and helpers
""" """
from functools import partial
import multiprocessing as mp import multiprocessing as mp
from typing import Tuple, List, Dict, Optional, Any from typing import Tuple, List, Dict, Optional, Any
import typing import typing
@ -10,7 +11,7 @@ from async_generator import asynccontextmanager
from ._state import current_actor from ._state import current_actor
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor # , ActorFailure from ._actor import Actor
from ._portal import Portal from ._portal import Portal
from . import _spawn from . import _spawn
@ -51,6 +52,7 @@ class ActorNursery:
rpc_module_paths: List[str] = None, rpc_module_paths: List[str] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None, nursery: trio.Nursery = None,
infect_asyncio: bool = False,
) -> Portal: ) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel() loglevel = loglevel or self._actor.loglevel or get_loglevel()
@ -71,13 +73,16 @@ class ActorNursery:
# XXX: the type ignore is actually due to a `mypy` bug # XXX: the type ignore is actually due to a `mypy` bug
return await nursery.start( # type: ignore return await nursery.start( # type: ignore
_spawn.new_proc, partial(
name, _spawn.new_proc,
self, name,
subactor, self,
self.errors, subactor,
bind_addr, self.errors,
parent_addr, bind_addr,
parent_addr,
infect_asyncio=infect_asyncio,
)
) )
async def run_in_actor( async def run_in_actor(
@ -88,6 +93,7 @@ class ActorNursery:
rpc_module_paths: Optional[List[str]] = None, rpc_module_paths: Optional[List[str]] = None,
statespace: Dict[str, Any] = None, statespace: Dict[str, Any] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
infect_asyncio: bool = False,
**kwargs, # explicit args to ``fn`` **kwargs, # explicit args to ``fn``
) -> Portal: ) -> Portal:
"""Spawn a new actor, run a lone task, then terminate the actor and """Spawn a new actor, run a lone task, then terminate the actor and
@ -106,6 +112,7 @@ class ActorNursery:
loglevel=loglevel, loglevel=loglevel,
# use the run_in_actor nursery # use the run_in_actor nursery
nursery=self._ria_nursery, nursery=self._ria_nursery,
infect_asyncio=infect_asyncio,
) )
# this marks the actor to be cancelled after its portal result # this marks the actor to be cancelled after its portal result
# is retreived, see logic in `open_nursery()` below. # is retreived, see logic in `open_nursery()` below.
@ -131,7 +138,7 @@ class ActorNursery:
# send KeyBoardInterrupt (trio abort signal) to sub-actors # send KeyBoardInterrupt (trio abort signal) to sub-actors
# os.kill(proc.pid, signal.SIGINT) # os.kill(proc.pid, signal.SIGINT)
log.debug(f"Cancelling nursery") log.debug("Cancelling nursery")
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values(): for subactor, proc, portal in self._children.values():
@ -260,7 +267,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
# Last bit before first nursery block ends in the case # Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope # where we didn't error in the caller's scope
log.debug(f"Waiting on all subactors to complete") log.debug("Waiting on all subactors to complete")
anursery._join_procs.set() anursery._join_procs.set()
# ria_nursery scope end # ria_nursery scope end
@ -293,4 +300,4 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
# ria_nursery scope end # ria_nursery scope end
log.debug(f"Nursery teardown complete") log.debug("Nursery teardown complete")

View File

@ -0,0 +1,70 @@
"""
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
"""
import asyncio
import inspect
from typing import (
Any,
Callable,
AsyncGenerator,
Awaitable,
Union,
)
import trio
async def _invoke(
from_trio,
to_trio,
coro
) -> Union[AsyncGenerator, Awaitable]:
"""Await or stream awaiable object based on type into
``trio`` memory channel.
"""
async def stream_from_gen(c):
async for item in c:
to_trio.put_nowait(item)
to_trio.put_nowait
async def just_return(c):
to_trio.put_nowait(await c)
if inspect.isasyncgen(coro):
return await stream_from_gen(coro)
elif inspect.iscoroutine(coro):
return await coro
# TODO: make this some kind of tractor.to_asyncio.run()
async def run(
func: Callable,
qsize: int = 2**10,
**kwargs,
) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
# ITC (inter task comms)
from_trio = asyncio.Queue(qsize)
to_trio, from_aio = trio.open_memory_channel(qsize)
# allow target func to accept/stream results manually
kwargs['to_trio'] = to_trio
kwargs['from_trio'] = to_trio
coro = func(**kwargs)
# start the asyncio task we submitted from trio
# TODO: try out ``anyio`` asyncio based tg here
asyncio.create_task(_invoke(from_trio, to_trio, coro))
# determine return type async func vs. gen
if inspect.isasyncgen(coro):
await from_aio.get()
elif inspect.iscoroutine(coro):
async def gen():
async for tick in from_aio:
yield tuple(tick)
return gen()