Wow, fix all the broken async func invoking code..

Clearly this wasn't developed against a task that spawned just an async
func in `asyncio`.. Fix all that and remove a bunch of unnecessary func
layers. Add provisional support for the target receiving the `to_trio`
and `from_trio` channels and for the @tractor.stream marker.
wip_fix_asyncio_gen_streaming
Tyler Goodlet 2020-07-03 17:33:46 -04:00
parent 940774f215
commit a9985c0c01
1 changed files with 36 additions and 31 deletions

View File

@ -4,7 +4,6 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
import asyncio
import inspect
from typing import (
Any,
Callable,
AsyncGenerator,
Awaitable,
@ -19,46 +18,54 @@ from ._state import current_actor
log = get_logger(__name__)
__all__ = ['run_task']
__all__ = ['run_task', 'run_as_asyncio_guest']
async def _invoke(
from_trio: trio.abc.ReceiveChannel,
to_trio: asyncio.Queue,
coro: Awaitable,
) -> Union[AsyncGenerator, Awaitable]:
"""Await or stream awaiable object based on type into
) -> None:
"""Await or stream awaiable object based on ``coro`` type into
``trio`` memory channel.
``from_trio`` might eventually be used here for bidirectional streaming.
"""
async def stream_from_gen(c):
async for item in c:
to_trio.send_nowait(item)
async def just_return(c):
to_trio.send_nowait(await c)
if inspect.isasyncgen(coro):
return await stream_from_gen(coro)
async for item in coro:
to_trio.send_nowait(item)
elif inspect.iscoroutine(coro):
return await coro
to_trio.send_nowait(await coro)
async def run_task(
func: Callable,
*,
qsize: int = 2**10,
_treat_as_stream: bool = False,
**kwargs,
) -> Any:
) -> Union[AsyncGenerator, Awaitable]:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
assert current_actor()._infected_aio
assert current_actor().is_infected_aio()
# ITC (inter task comms)
from_trio = asyncio.Queue(qsize)
to_trio, from_aio = trio.open_memory_channel(qsize)
# allow target func to accept/stream results manually
args = tuple(inspect.getfullargspec(func).args)
if getattr(func, '_tractor_steam_function', None):
# the assumption is that the target async routine accepts the
# send channel then it intends to yield more then one return
# value otherwise it would just return ;P
_treat_as_stream = True
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = to_trio
coro = func(**kwargs)
@ -70,7 +77,6 @@ async def run_task(
task = asyncio.create_task(_invoke(from_trio, to_trio, coro))
err = None
# XXX: I'm not sure this actually does anything...
def cancel_trio(task):
"""Cancel the calling ``trio`` task on error.
"""
@ -80,17 +86,8 @@ async def run_task(
task.add_done_callback(cancel_trio)
# determine return type async func vs. gen
if inspect.isasyncgen(coro):
# simple async func
async def result():
with cancel_scope:
return await from_aio.get()
if cancel_scope.cancelled_caught and err:
raise err
elif inspect.iscoroutine(coro):
# asycn gen
if inspect.isasyncgen(coro) or _treat_as_stream:
async def result():
with cancel_scope:
async with from_aio:
@ -101,6 +98,14 @@ async def run_task(
return result()
# simple async func
elif inspect.iscoroutine(coro):
with cancel_scope:
result = await from_aio.receive()
return result
if cancel_scope.cancelled_caught and err:
raise err
def run_as_asyncio_guest(
trio_main: Awaitable,