From 6da2c3a885a5a28d60489c707d7487fd137a6ee7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 2 Aug 2021 12:36:40 -0400 Subject: [PATCH] Drop old implementation cruft --- tractor/to_asyncio.py | 121 +++++++++++------------------------------- 1 file changed, 31 insertions(+), 90 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 92320a0..c7bcc89 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -1,6 +1,7 @@ -""" +''' Infection apis for ``asyncio`` loops running ``trio`` using guest mode. -""" + +''' import asyncio import inspect from typing import ( @@ -119,69 +120,20 @@ def _run_asyncio_task( async def run_task( + func: Callable, *, + qsize: int = 2**10, _treat_as_stream: bool = False, **kwargs, + ) -> Any: """Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. """ - # assert current_actor().is_infected_aio() - - # # ITC (inter task comms) - # from_trio = asyncio.Queue(qsize) # type: ignore - # to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore - - # args = tuple(inspect.getfullargspec(func).args) - - # if getattr(func, '_tractor_steam_function', None): - # # the assumption is that the target async routine accepts the - # # send channel then it intends to yield more then one return - # # value otherwise it would just return ;P - # _treat_as_stream = True - - # # allow target func to accept/stream results manually by name - # if 'to_trio' in args: - # kwargs['to_trio'] = to_trio - # if 'from_trio' in args: - # kwargs['from_trio'] = from_trio - - # coro = func(**kwargs) - - # cancel_scope = trio.CancelScope() - - # # start the asyncio task we submitted from trio - # if inspect.isawaitable(coro): - # task = asyncio.create_task(run_coro(to_trio, coro)) - - # elif inspect.isasyncgen(coro): - # task = asyncio.create_task(consume_asyncgen(to_trio, coro)) - - # else: - # raise TypeError(f"No support for invoking {coro}") - - # aio_err = None - - # def cancel_trio(task): - # """Cancel the calling ``trio`` task on error. - # """ - # nonlocal aio_err - # aio_err = task.exception() - - # if aio_err: - # log.exception(f"asyncio task errorred:\n{aio_err}") - - # cancel_scope.cancel() - - # task.add_done_callback(cancel_trio) - - # async iterator - # if inspect.isasyncgen(coro) or _treat_as_stream: - - # if inspect.isasyncgenfunction(meth) or : + # streaming ``asyncio`` task if _treat_as_stream: task, from_aio, to_trio, cs = _run_asyncio_task( @@ -190,33 +142,10 @@ async def run_task( **kwargs, ) + # naively expect the mem chan api to do the job + # of handling cross-framework cancellations / errors return from_aio - # async def stream_results(): - # try: - # with cancel_scope: - # # stream values upward - # async with from_aio: - # async for item in from_aio: - # yield item - - # if cancel_scope.cancelled_caught: - # # always raise from any captured asyncio error - # if aio_err: - # raise aio_err - - # except BaseException as err: - # if aio_err is not None: - # # always raise from any captured asyncio error - # raise err from aio_err - # else: - # raise - # finally: - # # breakpoint() - # task.cancel() - - # return stream_results() - # simple async func try: task, from_aio, to_trio, cs = _run_asyncio_task( @@ -225,9 +154,7 @@ async def run_task( **kwargs, ) - # with cancel_scope: - # async with from_aio: - # return single value + # return single value with cs: return await from_aio.receive() @@ -238,18 +165,14 @@ async def run_task( # Do we need this? except Exception as err: - # await tractor.breakpoint() + aio_err = from_aio._err - # try: if aio_err is not None: # always raise from any captured asyncio error raise err from aio_err else: raise - # finally: - # if not task.done(): - # task.cancel() except trio.Cancelled: if not task.done(): @@ -258,8 +181,26 @@ async def run_task( raise -# async def stream_from_task -# pass +# TODO: explicit api for the streaming case where +# we pull from the mem chan in an async generator? +# This ends up looking more like our ``Portal.open_stream_from()`` +# NB: code below is untested. + +# @asynccontextmanager +# async def stream_from_task( + +# target: Callable[Any], +# **kwargs, + +# ) -> AsyncIterator[Any]: + +# from_aoi = await run_task(target, _treat_as_stream=True, **kwargs) + +# with cancel_scope: +# # stream values upward +# async with from_aio: +# async for item in from_aio: +# yield item def run_as_asyncio_guest(