Wow, fix all the broken async func invoking code..

Clearly this wasn't developed against a task that spawned just an async
func in `asyncio`.. Fix all that and remove a bunch of unnecessary func
layers. Add provisional support for the target receiving the `to_trio`
and `from_trio` channels and for the @tractor.stream marker.
drop-trip-update-trio
Tyler Goodlet 2020-07-03 17:33:46 -04:00
parent fcd1566834
commit 6d5ebb9aa7
1 changed files with 36 additions and 31 deletions

View File

@ -4,7 +4,6 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
import asyncio import asyncio
import inspect import inspect
from typing import ( from typing import (
Any,
Callable, Callable,
AsyncGenerator, AsyncGenerator,
Awaitable, Awaitable,
@ -19,47 +18,55 @@ from ._state import current_actor
log = get_logger(__name__) log = get_logger(__name__)
__all__ = ['run_task'] __all__ = ['run_task', 'run_as_asyncio_guest']
async def _invoke( async def _invoke(
from_trio: trio.abc.ReceiveChannel, from_trio: trio.abc.ReceiveChannel,
to_trio: asyncio.Queue, to_trio: asyncio.Queue,
coro: Awaitable, coro: Awaitable,
) -> Union[AsyncGenerator, Awaitable]: ) -> None:
"""Await or stream awaiable object based on type into """Await or stream awaiable object based on ``coro`` type into
``trio`` memory channel. ``trio`` memory channel.
``from_trio`` might eventually be used here for bidirectional streaming.
""" """
async def stream_from_gen(c):
async for item in c:
to_trio.send_nowait(item)
async def just_return(c):
to_trio.send_nowait(await c)
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro):
return await stream_from_gen(coro) async for item in coro:
to_trio.send_nowait(item)
elif inspect.iscoroutine(coro): elif inspect.iscoroutine(coro):
return await coro to_trio.send_nowait(await coro)
async def run_task( async def run_task(
func: Callable, func: Callable,
*,
qsize: int = 2**10, qsize: int = 2**10,
_treat_as_stream: bool = False,
**kwargs, **kwargs,
) -> Any: ) -> Union[AsyncGenerator, Awaitable]:
"""Run an ``asyncio`` async function or generator in a task, return """Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``. or stream the result back to ``trio``.
""" """
assert current_actor()._infected_aio assert current_actor().is_infected_aio()
# ITC (inter task comms) # ITC (inter task comms)
from_trio = asyncio.Queue(qsize) from_trio = asyncio.Queue(qsize)
to_trio, from_aio = trio.open_memory_channel(qsize) to_trio, from_aio = trio.open_memory_channel(qsize)
# allow target func to accept/stream results manually args = tuple(inspect.getfullargspec(func).args)
kwargs['to_trio'] = to_trio
kwargs['from_trio'] = to_trio if getattr(func, '_tractor_steam_function', None):
# the assumption is that the target async routine accepts the
# send channel then it intends to yield more then one return
# value otherwise it would just return ;P
_treat_as_stream = True
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = to_trio
coro = func(**kwargs) coro = func(**kwargs)
@ -70,7 +77,6 @@ async def run_task(
task = asyncio.create_task(_invoke(from_trio, to_trio, coro)) task = asyncio.create_task(_invoke(from_trio, to_trio, coro))
err = None err = None
# XXX: I'm not sure this actually does anything...
def cancel_trio(task): def cancel_trio(task):
"""Cancel the calling ``trio`` task on error. """Cancel the calling ``trio`` task on error.
""" """
@ -80,17 +86,8 @@ async def run_task(
task.add_done_callback(cancel_trio) task.add_done_callback(cancel_trio)
# determine return type async func vs. gen # asycn gen
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro) or _treat_as_stream:
# simple async func
async def result():
with cancel_scope:
return await from_aio.get()
if cancel_scope.cancelled_caught and err:
raise err
elif inspect.iscoroutine(coro):
# asycn gen
async def result(): async def result():
with cancel_scope: with cancel_scope:
async with from_aio: async with from_aio:
@ -99,7 +96,15 @@ async def run_task(
if cancel_scope.cancelled_caught and err: if cancel_scope.cancelled_caught and err:
raise err raise err
return result() return result()
# simple async func
elif inspect.iscoroutine(coro):
with cancel_scope:
result = await from_aio.receive()
return result
if cancel_scope.cancelled_caught and err:
raise err
def run_as_asyncio_guest( def run_as_asyncio_guest(