From 8e321995097ed3983091e2d0b7dc40c6bc208971 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 24 Jul 2020 16:55:34 -0400 Subject: [PATCH] Get entry points reorg without asyncio compat This is an edit to factor out changes needed for the `asyncio` in guest mode integration (which currently isn't tested well) so that later more pertinent changes (which are tested well) can be rebased off of this branch and merged into mainline sooner. The *infect_asyncio* branch will need to be rebased onto this branch as well before merge to mainline. --- tractor/__init__.py | 2 -- tractor/_entry.py | 52 ++------------------------------ tractor/_trionics.py | 4 --- tractor/to_asyncio.py | 70 ------------------------------------------- 4 files changed, 2 insertions(+), 126 deletions(-) delete mode 100644 tractor/to_asyncio.py diff --git a/tractor/__init__.py b/tractor/__init__.py index 4a2cda9..80eaf05 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -20,7 +20,6 @@ from ._state import current_actor from ._exceptions import RemoteActorError, ModuleNotExposed from . import msg from . import _spawn -from . import to_asyncio __all__ = [ @@ -36,7 +35,6 @@ __all__ = [ 'RemoteActorError', 'ModuleNotExposed', 'msg' - 'to_asyncio' ] diff --git a/tractor/_entry.py b/tractor/_entry.py index ff3cce7..a6a33db 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -1,9 +1,8 @@ """ Process entry points. """ -import asyncio from functools import partial -from typing import Tuple, Any, Awaitable +from typing import Tuple, Any import trio # type: ignore @@ -12,52 +11,9 @@ from .log import get_console_log, get_logger from . import _state -__all__ = ('run',) - - log = get_logger(__name__) -def _asyncio_main( - trio_main: Awaitable, -) -> None: - """Entry for an "infected ``asyncio`` actor". - - Uh, oh. :o - - It looks like your event loop has caught a case of the ``trio``s. - - :() - - Don't worry, we've heard you'll barely notice. You might hallucinate - a few more propagating errors and feel like your digestion has - slowed but if anything get's too bad your parents will know about - it. - - :) - """ - async def aio_main(trio_main): - loop = asyncio.get_running_loop() - - trio_done_fut = asyncio.Future() - - def trio_done_callback(main_outcome): - log.info(f"trio_main finished: {main_outcome!r}") - trio_done_fut.set_result(main_outcome) - - # start the infection: run trio on the asyncio loop in "guest mode" - log.info(f"Infecting asyncio process with {trio_main}") - trio.lowlevel.start_guest_run( - trio_main, - run_sync_soon_threadsafe=loop.call_soon_threadsafe, - done_callback=trio_done_callback, - ) - - (await trio_done_fut).unwrap() - - asyncio.run(aio_main(trio_main)) - - def _mp_main( actor: 'Actor', accept_addr: Tuple[str, int], @@ -90,11 +46,7 @@ def _mp_main( parent_addr=parent_addr ) try: - if infect_asyncio: - actor._infected_aio = True - _asyncio_main(trio_main) - else: - trio.run(trio_main) + trio.run(trio_main) except KeyboardInterrupt: pass # handle it the same way trio does? log.info(f"Actor {actor.uid} terminated") diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 5f2f49c..cff54fe 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -52,7 +52,6 @@ class ActorNursery: rpc_module_paths: List[str] = None, loglevel: str = None, # set log level per subactor nursery: trio.Nursery = None, - infect_asyncio: bool = False, ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() @@ -81,7 +80,6 @@ class ActorNursery: self.errors, bind_addr, parent_addr, - infect_asyncio=infect_asyncio, ) ) @@ -93,7 +91,6 @@ class ActorNursery: rpc_module_paths: Optional[List[str]] = None, statespace: Dict[str, Any] = None, loglevel: str = None, # set log level per subactor - infect_asyncio: bool = False, **kwargs, # explicit args to ``fn`` ) -> Portal: """Spawn a new actor, run a lone task, then terminate the actor and @@ -112,7 +109,6 @@ class ActorNursery: loglevel=loglevel, # use the run_in_actor nursery nursery=self._ria_nursery, - infect_asyncio=infect_asyncio, ) # this marks the actor to be cancelled after its portal result # is retreived, see logic in `open_nursery()` below. diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py deleted file mode 100644 index 5d90f22..0000000 --- a/tractor/to_asyncio.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -Infection apis for ``asyncio`` loops running ``trio`` using guest mode. -""" -import asyncio -import inspect -from typing import ( - Any, - Callable, - AsyncGenerator, - Awaitable, - Union, -) - -import trio - - -async def _invoke( - from_trio, - to_trio, - coro -) -> Union[AsyncGenerator, Awaitable]: - """Await or stream awaiable object based on type into - ``trio`` memory channel. - """ - async def stream_from_gen(c): - async for item in c: - to_trio.put_nowait(item) - to_trio.put_nowait - - async def just_return(c): - to_trio.put_nowait(await c) - - if inspect.isasyncgen(coro): - return await stream_from_gen(coro) - elif inspect.iscoroutine(coro): - return await coro - - -# TODO: make this some kind of tractor.to_asyncio.run() -async def run( - func: Callable, - qsize: int = 2**10, - **kwargs, -) -> Any: - """Run an ``asyncio`` async function or generator in a task, return - or stream the result back to ``trio``. - """ - # ITC (inter task comms) - from_trio = asyncio.Queue(qsize) - to_trio, from_aio = trio.open_memory_channel(qsize) - - # allow target func to accept/stream results manually - kwargs['to_trio'] = to_trio - kwargs['from_trio'] = to_trio - - coro = func(**kwargs) - - # start the asyncio task we submitted from trio - # TODO: try out ``anyio`` asyncio based tg here - asyncio.create_task(_invoke(from_trio, to_trio, coro)) - - # determine return type async func vs. gen - if inspect.isasyncgen(coro): - await from_aio.get() - elif inspect.iscoroutine(coro): - async def gen(): - async for tick in from_aio: - yield tuple(tick) - - return gen()