forked from goodboy/tractor
1
0
Fork 0

Attempt to make mypy happy..

msgspec_infect_asyncio
Tyler Goodlet 2020-07-21 10:32:37 -04:00
parent 45a743cdd4
commit 92594d8222
1 changed files with 36 additions and 22 deletions

View File

@ -4,6 +4,7 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
import asyncio import asyncio
import inspect import inspect
from typing import ( from typing import (
Any,
Callable, Callable,
AsyncGenerator, AsyncGenerator,
Awaitable, Awaitable,
@ -21,21 +22,26 @@ log = get_logger(__name__)
__all__ = ['run_task', 'run_as_asyncio_guest'] __all__ = ['run_task', 'run_as_asyncio_guest']
async def _invoke( async def run_coro(
from_trio: trio.abc.ReceiveChannel, to_trio: trio.MemorySendChannel,
to_trio: asyncio.Queue,
coro: Awaitable, coro: Awaitable,
) -> None: ) -> None:
"""Await or stream awaiable object based on ``coro`` type into """Await ``coro`` and relay result back to ``trio``.
``trio`` memory channel. """
to_trio.send_nowait(await coro)
``from_trio`` might eventually be used here for bidirectional streaming.
async def consume_asyncgen(
to_trio: trio.MemorySendChannel,
coro: AsyncGenerator,
) -> None:
"""Stream async generator results back to ``trio``.
``from_trio`` might eventually be used here for
bidirectional streaming.
""" """
if inspect.isasyncgen(coro):
async for item in coro: async for item in coro:
to_trio.send_nowait(item) to_trio.send_nowait(item)
elif inspect.iscoroutine(coro):
to_trio.send_nowait(await coro)
async def run_task( async def run_task(
@ -44,15 +50,15 @@ async def run_task(
qsize: int = 2**10, qsize: int = 2**10,
_treat_as_stream: bool = False, _treat_as_stream: bool = False,
**kwargs, **kwargs,
) -> Union[AsyncGenerator, Awaitable]: ) -> Union[AsyncGenerator, Any]:
"""Run an ``asyncio`` async function or generator in a task, return """Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``. or stream the result back to ``trio``.
""" """
assert current_actor().is_infected_aio() assert current_actor().is_infected_aio()
# ITC (inter task comms) # ITC (inter task comms)
from_trio = asyncio.Queue(qsize) from_trio = asyncio.Queue(qsize) # type: ignore
to_trio, from_aio = trio.open_memory_channel(qsize) to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
args = tuple(inspect.getfullargspec(func).args) args = tuple(inspect.getfullargspec(func).args)
@ -66,7 +72,7 @@ async def run_task(
if 'to_trio' in args: if 'to_trio' in args:
kwargs['to_trio'] = to_trio kwargs['to_trio'] = to_trio
if 'from_trio' in args: if 'from_trio' in args:
kwargs['from_trio'] = to_trio kwargs['from_trio'] = from_trio
coro = func(**kwargs) coro = func(**kwargs)
@ -74,7 +80,13 @@ async def run_task(
# start the asyncio task we submitted from trio # start the asyncio task we submitted from trio
# TODO: try out ``anyio`` asyncio based tg here # TODO: try out ``anyio`` asyncio based tg here
task = asyncio.create_task(_invoke(from_trio, to_trio, coro)) if inspect.isawaitable(coro):
task = asyncio.create_task(run_coro(to_trio, coro))
elif inspect.isasyncgen(coro):
task = asyncio.create_task(consume_asyncgen(to_trio, coro))
else:
raise TypeError(f"No support for {coro}")
err = None err = None
def cancel_trio(task): def cancel_trio(task):
@ -88,27 +100,29 @@ async def run_task(
# asycn gen # asycn gen
if inspect.isasyncgen(coro) or _treat_as_stream: if inspect.isasyncgen(coro) or _treat_as_stream:
async def result():
async def stream_results():
with cancel_scope: with cancel_scope:
# stream values upward
async with from_aio: async with from_aio:
async for item in from_aio: async for item in from_aio:
yield item yield item
if cancel_scope.cancelled_caught and err: if cancel_scope.cancelled_caught and err:
raise err raise err
return result() return stream_results()
# simple async func # simple async func
elif inspect.iscoroutine(coro): elif inspect.iscoroutine(coro):
with cancel_scope: with cancel_scope:
result = await from_aio.receive() # return single value
return result return await from_aio.receive()
if cancel_scope.cancelled_caught and err: if cancel_scope.cancelled_caught and err:
raise err raise err
def run_as_asyncio_guest( def run_as_asyncio_guest(
trio_main: Awaitable, trio_main: Callable,
) -> None: ) -> None:
"""Entry for an "infected ``asyncio`` actor". """Entry for an "infected ``asyncio`` actor".