WIP redo asyncio async gen streaming

infect_asyncio
Tyler Goodlet 2021-04-27 12:20:33 -04:00
parent 340effae11
commit d80f8d7a39
2 changed files with 136 additions and 32 deletions

View File

@ -103,7 +103,6 @@ async def open_root_actor(
_default_arbiter_port, _default_arbiter_port,
) )
if loglevel is None: if loglevel is None:
loglevel = log.get_loglevel() loglevel = log.get_loglevel()
else: else:

View File

@ -43,15 +43,16 @@ async def consume_asyncgen(
to_trio.send_nowait(item) to_trio.send_nowait(item)
async def run_task( def _run_asyncio_task(
func: Callable, func: Callable,
*, *,
qsize: int = 2**10, qsize: int = 1,
_treat_as_stream: bool = False, _treat_as_stream: bool = False,
**kwargs, **kwargs,
) -> Any: ) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return """Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``. or stream the result back to ``trio``.
""" """
assert current_actor().is_infected_aio() assert current_actor().is_infected_aio()
@ -59,29 +60,38 @@ async def run_task(
from_trio = asyncio.Queue(qsize) # type: ignore from_trio = asyncio.Queue(qsize) # type: ignore
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
from_aio._err = None
args = tuple(inspect.getfullargspec(func).args) args = tuple(inspect.getfullargspec(func).args)
if getattr(func, '_tractor_steam_function', None): if getattr(func, '_tractor_steam_function', None):
# the assumption is that the target async routine accepts the # the assumption is that the target async routine accepts the
# send channel then it intends to yield more then one return # send channel then it intends to yield more then one return
# value otherwise it would just return ;P # value otherwise it would just return ;P
_treat_as_stream = True # _treat_as_stream = True
assert qsize > 1
# allow target func to accept/stream results manually by name # allow target func to accept/stream results manually by name
if 'to_trio' in args: if 'to_trio' in args:
kwargs['to_trio'] = to_trio kwargs['to_trio'] = to_trio
if 'from_trio' in args: if 'from_trio' in args:
kwargs['from_trio'] = from_trio kwargs['from_trio'] = from_trio
# if 'from_aio' in args:
# kwargs['from_aio'] = from_aio
coro = func(**kwargs) coro = func(**kwargs)
cancel_scope = trio.CancelScope() # cancel_scope = trio.CancelScope()
# start the asyncio task we submitted from trio # start the asyncio task we submitted from trio
if inspect.isawaitable(coro): if inspect.isawaitable(coro):
task = asyncio.create_task(run_coro(to_trio, coro)) task = asyncio.create_task(run_coro(to_trio, coro))
elif inspect.isasyncgen(coro): elif inspect.isasyncgen(coro):
task = asyncio.create_task(consume_asyncgen(to_trio, coro)) task = asyncio.create_task(consume_asyncgen(to_trio, coro))
else: else:
raise TypeError(f"No support for invoking {coro}") raise TypeError(f"No support for invoking {coro}")
@ -96,54 +106,149 @@ async def run_task(
if aio_err: if aio_err:
log.exception(f"asyncio task errorred:\n{aio_err}") log.exception(f"asyncio task errorred:\n{aio_err}")
cancel_scope.cancel() # cancel_scope.cancel()
from_aio._err = aio_err
to_trio.close()
task.add_done_callback(cancel_trio) task.add_done_callback(cancel_trio)
return task, from_aio, to_trio
async def run_task(
func: Callable,
*,
qsize: int = 2**10,
_treat_as_stream: bool = False,
**kwargs,
) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
# assert current_actor().is_infected_aio()
# # ITC (inter task comms)
# from_trio = asyncio.Queue(qsize) # type: ignore
# to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
# args = tuple(inspect.getfullargspec(func).args)
# if getattr(func, '_tractor_steam_function', None):
# # the assumption is that the target async routine accepts the
# # send channel then it intends to yield more then one return
# # value otherwise it would just return ;P
# _treat_as_stream = True
# # allow target func to accept/stream results manually by name
# if 'to_trio' in args:
# kwargs['to_trio'] = to_trio
# if 'from_trio' in args:
# kwargs['from_trio'] = from_trio
# coro = func(**kwargs)
# cancel_scope = trio.CancelScope()
# # start the asyncio task we submitted from trio
# if inspect.isawaitable(coro):
# task = asyncio.create_task(run_coro(to_trio, coro))
# elif inspect.isasyncgen(coro):
# task = asyncio.create_task(consume_asyncgen(to_trio, coro))
# else:
# raise TypeError(f"No support for invoking {coro}")
# aio_err = None
# def cancel_trio(task):
# """Cancel the calling ``trio`` task on error.
# """
# nonlocal aio_err
# aio_err = task.exception()
# if aio_err:
# log.exception(f"asyncio task errorred:\n{aio_err}")
# cancel_scope.cancel()
# task.add_done_callback(cancel_trio)
# async iterator # async iterator
if inspect.isasyncgen(coro) or _treat_as_stream: # if inspect.isasyncgen(coro) or _treat_as_stream:
async def stream_results(): # if inspect.isasyncgenfunction(meth) or :
try: if _treat_as_stream:
with cancel_scope:
# stream values upward
async with from_aio:
async for item in from_aio:
yield item
if cancel_scope.cancelled_caught: task, from_aio, to_trio = _run_asyncio_task(
# always raise from any captured asyncio error func,
if aio_err: qsize=2**8,
raise aio_err **kwargs,
)
except BaseException as err: return from_aio
if aio_err is not None:
# always raise from any captured asyncio error
raise err from aio_err
else:
raise
return stream_results() # async def stream_results():
# try:
# with cancel_scope:
# # stream values upward
# async with from_aio:
# async for item in from_aio:
# yield item
# if cancel_scope.cancelled_caught:
# # always raise from any captured asyncio error
# if aio_err:
# raise aio_err
# except BaseException as err:
# if aio_err is not None:
# # always raise from any captured asyncio error
# raise err from aio_err
# else:
# raise
# finally:
# # breakpoint()
# task.cancel()
# return stream_results()
# simple async func # simple async func
try: try:
with cancel_scope: task, from_aio, to_trio = _run_asyncio_task(
# return single value func,
return await from_aio.receive() qsize=1,
**kwargs,
)
if cancel_scope.cancelled_caught: # with cancel_scope:
# always raise from any captured asyncio error # async with from_aio:
if aio_err: # return single value
raise aio_err return await from_aio.receive()
# if cancel_scope.cancelled_caught:
# # always raise from any captured asyncio error
# if aio_err:
# raise aio_err
# Do we need this? # Do we need this?
except BaseException as err: except BaseException as err:
# await tractor.breakpoint()
aio_err = from_aio._err
if aio_err is not None: if aio_err is not None:
# always raise from any captured asyncio error # always raise from any captured asyncio error
raise err from aio_err raise err from aio_err
else: else:
raise raise
finally:
task.cancel()
# async def stream_from_task
# pass
def run_as_asyncio_guest( def run_as_asyncio_guest(
trio_main: Callable, trio_main: Callable,