diff --git a/tractor/__init__.py b/tractor/__init__.py index 80eaf05..4a2cda9 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -20,6 +20,7 @@ from ._state import current_actor from ._exceptions import RemoteActorError, ModuleNotExposed from . import msg from . import _spawn +from . import to_asyncio __all__ = [ @@ -35,6 +36,7 @@ __all__ = [ 'RemoteActorError', 'ModuleNotExposed', 'msg' + 'to_asyncio' ] diff --git a/tractor/_entry.py b/tractor/_entry.py new file mode 100644 index 0000000..ff3cce7 --- /dev/null +++ b/tractor/_entry.py @@ -0,0 +1,121 @@ +""" +Process entry points. +""" +import asyncio +from functools import partial +from typing import Tuple, Any, Awaitable + +import trio # type: ignore + +from ._actor import Actor +from .log import get_console_log, get_logger +from . import _state + + +__all__ = ('run',) + + +log = get_logger(__name__) + + +def _asyncio_main( + trio_main: Awaitable, +) -> None: + """Entry for an "infected ``asyncio`` actor". + + Uh, oh. :o + + It looks like your event loop has caught a case of the ``trio``s. + + :() + + Don't worry, we've heard you'll barely notice. You might hallucinate + a few more propagating errors and feel like your digestion has + slowed but if anything get's too bad your parents will know about + it. + + :) + """ + async def aio_main(trio_main): + loop = asyncio.get_running_loop() + + trio_done_fut = asyncio.Future() + + def trio_done_callback(main_outcome): + log.info(f"trio_main finished: {main_outcome!r}") + trio_done_fut.set_result(main_outcome) + + # start the infection: run trio on the asyncio loop in "guest mode" + log.info(f"Infecting asyncio process with {trio_main}") + trio.lowlevel.start_guest_run( + trio_main, + run_sync_soon_threadsafe=loop.call_soon_threadsafe, + done_callback=trio_done_callback, + ) + + (await trio_done_fut).unwrap() + + asyncio.run(aio_main(trio_main)) + + +def _mp_main( + actor: 'Actor', + accept_addr: Tuple[str, int], + forkserver_info: Tuple[Any, Any, Any, Any, Any], + start_method: str, + parent_addr: Tuple[str, int] = None, + infect_asyncio: bool = False, +) -> None: + """The routine called *after fork* which invokes a fresh ``trio.run`` + """ + actor._forkserver_info = forkserver_info + from ._spawn import try_set_start_method + spawn_ctx = try_set_start_method(start_method) + + if actor.loglevel is not None: + log.info( + f"Setting loglevel for {actor.uid} to {actor.loglevel}") + get_console_log(actor.loglevel) + + assert spawn_ctx + log.info( + f"Started new {spawn_ctx.current_process()} for {actor.uid}") + + _state._current_actor = actor + + log.debug(f"parent_addr is {parent_addr}") + trio_main = partial( + actor._async_main, + accept_addr, + parent_addr=parent_addr + ) + try: + if infect_asyncio: + actor._infected_aio = True + _asyncio_main(trio_main) + else: + trio.run(trio_main) + except KeyboardInterrupt: + pass # handle it the same way trio does? + log.info(f"Actor {actor.uid} terminated") + + +async def _trip_main( + actor: 'Actor', + accept_addr: Tuple[str, int], + parent_addr: Tuple[str, int] = None +) -> None: + """Entry point for a `trio_run_in_process` subactor. + + Here we don't need to call `trio.run()` since trip does that as + part of its subprocess startup sequence. + """ + if actor.loglevel is not None: + log.info( + f"Setting loglevel for {actor.uid} to {actor.loglevel}") + get_console_log(actor.loglevel) + + log.info(f"Started new TRIP process for {actor.uid}") + _state._current_actor = actor + await actor._async_main(accept_addr, parent_addr=parent_addr) + log.info(f"Actor {actor.uid} terminated") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 5700131..09c36e5 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -26,6 +26,7 @@ from ._state import current_actor from .log import get_logger from ._portal import Portal from ._actor import Actor, ActorFailure +from ._entry import _mp_main, _trip_main log = get_logger('tractor') @@ -161,6 +162,7 @@ async def new_proc( bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], use_trio_run_in_process: bool = False, + infect_asyncio: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: """Create a new ``multiprocessing.Process`` using the @@ -173,9 +175,12 @@ async def new_proc( async with trio.open_nursery() as nursery: if use_trio_run_in_process or _spawn_method == 'trio_run_in_process': + if infect_asyncio: + raise NotImplementedError("Asyncio is incompatible with trip") # trio_run_in_process async with trio_run_in_process.open_in_process( - subactor._trip_main, + _trip_main, + subactor, bind_addr, parent_addr, ) as proc: @@ -235,12 +240,14 @@ async def new_proc( fs_info = (None, None, None, None, None) proc = _ctx.Process( # type: ignore - target=subactor._mp_main, + target=_mp_main, args=( + subactor, bind_addr, fs_info, start_method, - parent_addr + parent_addr, + infect_asyncio, ), # daemon=True, name=name, diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 17ae548..5f2f49c 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -1,6 +1,7 @@ """ ``trio`` inspired apis and helpers """ +from functools import partial import multiprocessing as mp from typing import Tuple, List, Dict, Optional, Any import typing @@ -10,7 +11,7 @@ from async_generator import asynccontextmanager from ._state import current_actor from .log import get_logger, get_loglevel -from ._actor import Actor # , ActorFailure +from ._actor import Actor from ._portal import Portal from . import _spawn @@ -51,6 +52,7 @@ class ActorNursery: rpc_module_paths: List[str] = None, loglevel: str = None, # set log level per subactor nursery: trio.Nursery = None, + infect_asyncio: bool = False, ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() @@ -71,13 +73,16 @@ class ActorNursery: # XXX: the type ignore is actually due to a `mypy` bug return await nursery.start( # type: ignore - _spawn.new_proc, - name, - self, - subactor, - self.errors, - bind_addr, - parent_addr, + partial( + _spawn.new_proc, + name, + self, + subactor, + self.errors, + bind_addr, + parent_addr, + infect_asyncio=infect_asyncio, + ) ) async def run_in_actor( @@ -88,6 +93,7 @@ class ActorNursery: rpc_module_paths: Optional[List[str]] = None, statespace: Dict[str, Any] = None, loglevel: str = None, # set log level per subactor + infect_asyncio: bool = False, **kwargs, # explicit args to ``fn`` ) -> Portal: """Spawn a new actor, run a lone task, then terminate the actor and @@ -106,6 +112,7 @@ class ActorNursery: loglevel=loglevel, # use the run_in_actor nursery nursery=self._ria_nursery, + infect_asyncio=infect_asyncio, ) # this marks the actor to be cancelled after its portal result # is retreived, see logic in `open_nursery()` below. @@ -131,7 +138,7 @@ class ActorNursery: # send KeyBoardInterrupt (trio abort signal) to sub-actors # os.kill(proc.pid, signal.SIGINT) - log.debug(f"Cancelling nursery") + log.debug("Cancelling nursery") with trio.move_on_after(3) as cs: async with trio.open_nursery() as nursery: for subactor, proc, portal in self._children.values(): @@ -260,7 +267,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # Last bit before first nursery block ends in the case # where we didn't error in the caller's scope - log.debug(f"Waiting on all subactors to complete") + log.debug("Waiting on all subactors to complete") anursery._join_procs.set() # ria_nursery scope end @@ -293,4 +300,4 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # ria_nursery scope end - log.debug(f"Nursery teardown complete") + log.debug("Nursery teardown complete") diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py new file mode 100644 index 0000000..5d90f22 --- /dev/null +++ b/tractor/to_asyncio.py @@ -0,0 +1,70 @@ +""" +Infection apis for ``asyncio`` loops running ``trio`` using guest mode. +""" +import asyncio +import inspect +from typing import ( + Any, + Callable, + AsyncGenerator, + Awaitable, + Union, +) + +import trio + + +async def _invoke( + from_trio, + to_trio, + coro +) -> Union[AsyncGenerator, Awaitable]: + """Await or stream awaiable object based on type into + ``trio`` memory channel. + """ + async def stream_from_gen(c): + async for item in c: + to_trio.put_nowait(item) + to_trio.put_nowait + + async def just_return(c): + to_trio.put_nowait(await c) + + if inspect.isasyncgen(coro): + return await stream_from_gen(coro) + elif inspect.iscoroutine(coro): + return await coro + + +# TODO: make this some kind of tractor.to_asyncio.run() +async def run( + func: Callable, + qsize: int = 2**10, + **kwargs, +) -> Any: + """Run an ``asyncio`` async function or generator in a task, return + or stream the result back to ``trio``. + """ + # ITC (inter task comms) + from_trio = asyncio.Queue(qsize) + to_trio, from_aio = trio.open_memory_channel(qsize) + + # allow target func to accept/stream results manually + kwargs['to_trio'] = to_trio + kwargs['from_trio'] = to_trio + + coro = func(**kwargs) + + # start the asyncio task we submitted from trio + # TODO: try out ``anyio`` asyncio based tg here + asyncio.create_task(_invoke(from_trio, to_trio, coro)) + + # determine return type async func vs. gen + if inspect.isasyncgen(coro): + await from_aio.get() + elif inspect.iscoroutine(coro): + async def gen(): + async for tick in from_aio: + yield tuple(tick) + + return gen()