From 558ba7e008ed68eace246fcd9731afc7235fe5ca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 3 Jul 2020 17:33:46 -0400 Subject: [PATCH] Wow, fix all the broken async func invoking code.. Clearly this wasn't developed against a task that spawned just an async func in `asyncio`.. Fix all that and remove a bunch of unnecessary func layers. Add provisional support for the target receiving the `to_trio` and `from_trio` channels and for the @tractor.stream marker. --- tractor/to_asyncio.py | 67 +++++++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index a45904a..fb8f4cd 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -4,7 +4,6 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. import asyncio import inspect from typing import ( - Any, Callable, AsyncGenerator, Awaitable, @@ -19,47 +18,55 @@ from ._state import current_actor log = get_logger(__name__) -__all__ = ['run_task'] +__all__ = ['run_task', 'run_as_asyncio_guest'] async def _invoke( from_trio: trio.abc.ReceiveChannel, to_trio: asyncio.Queue, coro: Awaitable, -) -> Union[AsyncGenerator, Awaitable]: - """Await or stream awaiable object based on type into +) -> None: + """Await or stream awaiable object based on ``coro`` type into ``trio`` memory channel. + + ``from_trio`` might eventually be used here for bidirectional streaming. """ - async def stream_from_gen(c): - async for item in c: - to_trio.send_nowait(item) - - async def just_return(c): - to_trio.send_nowait(await c) - if inspect.isasyncgen(coro): - return await stream_from_gen(coro) + async for item in coro: + to_trio.send_nowait(item) elif inspect.iscoroutine(coro): - return await coro + to_trio.send_nowait(await coro) async def run_task( func: Callable, + *, qsize: int = 2**10, + _treat_as_stream: bool = False, **kwargs, -) -> Any: +) -> Union[AsyncGenerator, Awaitable]: """Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. """ - assert current_actor()._infected_aio + assert current_actor().is_infected_aio() # ITC (inter task comms) from_trio = asyncio.Queue(qsize) to_trio, from_aio = trio.open_memory_channel(qsize) - # allow target func to accept/stream results manually - kwargs['to_trio'] = to_trio - kwargs['from_trio'] = to_trio + args = tuple(inspect.getfullargspec(func).args) + + if getattr(func, '_tractor_steam_function', None): + # the assumption is that the target async routine accepts the + # send channel then it intends to yield more then one return + # value otherwise it would just return ;P + _treat_as_stream = True + + # allow target func to accept/stream results manually by name + if 'to_trio' in args: + kwargs['to_trio'] = to_trio + if 'from_trio' in args: + kwargs['from_trio'] = to_trio coro = func(**kwargs) @@ -70,7 +77,6 @@ async def run_task( task = asyncio.create_task(_invoke(from_trio, to_trio, coro)) err = None - # XXX: I'm not sure this actually does anything... def cancel_trio(task): """Cancel the calling ``trio`` task on error. """ @@ -80,17 +86,8 @@ async def run_task( task.add_done_callback(cancel_trio) - # determine return type async func vs. gen - if inspect.isasyncgen(coro): - # simple async func - async def result(): - with cancel_scope: - return await from_aio.get() - if cancel_scope.cancelled_caught and err: - raise err - - elif inspect.iscoroutine(coro): - # asycn gen + # asycn gen + if inspect.isasyncgen(coro) or _treat_as_stream: async def result(): with cancel_scope: async with from_aio: @@ -99,7 +96,15 @@ async def run_task( if cancel_scope.cancelled_caught and err: raise err - return result() + return result() + + # simple async func + elif inspect.iscoroutine(coro): + with cancel_scope: + result = await from_aio.receive() + return result + if cancel_scope.cancelled_caught and err: + raise err def run_as_asyncio_guest(