diff --git a/tractor/__init__.py b/tractor/__init__.py index 4a2cda9..80eaf05 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -20,7 +20,6 @@ from ._state import current_actor from ._exceptions import RemoteActorError, ModuleNotExposed from . import msg from . import _spawn -from . import to_asyncio __all__ = [ @@ -36,7 +35,6 @@ __all__ = [ 'RemoteActorError', 'ModuleNotExposed', 'msg' - 'to_asyncio' ] diff --git a/tractor/_entry.py b/tractor/_entry.py index ff3cce7..a6a33db 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -1,9 +1,8 @@ """ Process entry points. """ -import asyncio from functools import partial -from typing import Tuple, Any, Awaitable +from typing import Tuple, Any import trio # type: ignore @@ -12,52 +11,9 @@ from .log import get_console_log, get_logger from . import _state -__all__ = ('run',) - - log = get_logger(__name__) -def _asyncio_main( - trio_main: Awaitable, -) -> None: - """Entry for an "infected ``asyncio`` actor". - - Uh, oh. :o - - It looks like your event loop has caught a case of the ``trio``s. - - :() - - Don't worry, we've heard you'll barely notice. You might hallucinate - a few more propagating errors and feel like your digestion has - slowed but if anything get's too bad your parents will know about - it. - - :) - """ - async def aio_main(trio_main): - loop = asyncio.get_running_loop() - - trio_done_fut = asyncio.Future() - - def trio_done_callback(main_outcome): - log.info(f"trio_main finished: {main_outcome!r}") - trio_done_fut.set_result(main_outcome) - - # start the infection: run trio on the asyncio loop in "guest mode" - log.info(f"Infecting asyncio process with {trio_main}") - trio.lowlevel.start_guest_run( - trio_main, - run_sync_soon_threadsafe=loop.call_soon_threadsafe, - done_callback=trio_done_callback, - ) - - (await trio_done_fut).unwrap() - - asyncio.run(aio_main(trio_main)) - - def _mp_main( actor: 'Actor', accept_addr: Tuple[str, int], @@ -90,11 +46,7 @@ def _mp_main( parent_addr=parent_addr ) try: - if infect_asyncio: - actor._infected_aio = True - _asyncio_main(trio_main) - else: - trio.run(trio_main) + trio.run(trio_main) except KeyboardInterrupt: pass # handle it the same way trio does? log.info(f"Actor {actor.uid} terminated") diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 5f2f49c..cff54fe 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -52,7 +52,6 @@ class ActorNursery: rpc_module_paths: List[str] = None, loglevel: str = None, # set log level per subactor nursery: trio.Nursery = None, - infect_asyncio: bool = False, ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() @@ -81,7 +80,6 @@ class ActorNursery: self.errors, bind_addr, parent_addr, - infect_asyncio=infect_asyncio, ) ) @@ -93,7 +91,6 @@ class ActorNursery: rpc_module_paths: Optional[List[str]] = None, statespace: Dict[str, Any] = None, loglevel: str = None, # set log level per subactor - infect_asyncio: bool = False, **kwargs, # explicit args to ``fn`` ) -> Portal: """Spawn a new actor, run a lone task, then terminate the actor and @@ -112,7 +109,6 @@ class ActorNursery: loglevel=loglevel, # use the run_in_actor nursery nursery=self._ria_nursery, - infect_asyncio=infect_asyncio, ) # this marks the actor to be cancelled after its portal result # is retreived, see logic in `open_nursery()` below. diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py deleted file mode 100644 index 5d90f22..0000000 --- a/tractor/to_asyncio.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -Infection apis for ``asyncio`` loops running ``trio`` using guest mode. -""" -import asyncio -import inspect -from typing import ( - Any, - Callable, - AsyncGenerator, - Awaitable, - Union, -) - -import trio - - -async def _invoke( - from_trio, - to_trio, - coro -) -> Union[AsyncGenerator, Awaitable]: - """Await or stream awaiable object based on type into - ``trio`` memory channel. - """ - async def stream_from_gen(c): - async for item in c: - to_trio.put_nowait(item) - to_trio.put_nowait - - async def just_return(c): - to_trio.put_nowait(await c) - - if inspect.isasyncgen(coro): - return await stream_from_gen(coro) - elif inspect.iscoroutine(coro): - return await coro - - -# TODO: make this some kind of tractor.to_asyncio.run() -async def run( - func: Callable, - qsize: int = 2**10, - **kwargs, -) -> Any: - """Run an ``asyncio`` async function or generator in a task, return - or stream the result back to ``trio``. - """ - # ITC (inter task comms) - from_trio = asyncio.Queue(qsize) - to_trio, from_aio = trio.open_memory_channel(qsize) - - # allow target func to accept/stream results manually - kwargs['to_trio'] = to_trio - kwargs['from_trio'] = to_trio - - coro = func(**kwargs) - - # start the asyncio task we submitted from trio - # TODO: try out ``anyio`` asyncio based tg here - asyncio.create_task(_invoke(from_trio, to_trio, coro)) - - # determine return type async func vs. gen - if inspect.isasyncgen(coro): - await from_aio.get() - elif inspect.iscoroutine(coro): - async def gen(): - async for tick in from_aio: - yield tuple(tick) - - return gen()