From 7cc25fc99f0942e0f0cb1e98cf9c551fe3a96028 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Apr 2021 12:20:33 -0400 Subject: [PATCH] WIP redo asyncio async gen streaming --- tractor/_root.py | 6 +- tractor/to_asyncio.py | 167 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 139 insertions(+), 34 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index bd4b33c..3d27adb 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -234,7 +234,7 @@ def run( def run_daemon( - rpc_module_paths: List[str], + enable_modules: List[str], **kwargs ) -> None: """Spawn daemon actor which will respond to RPC. @@ -243,9 +243,9 @@ def run_daemon( ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned is meant to run forever responding to RPC requests. """ - kwargs['rpc_module_paths'] = list(rpc_module_paths) + kwargs['enable_modules'] = list(enable_modules) - for path in rpc_module_paths: + for path in enable_modules: importlib.import_module(path) return run(partial(trio.sleep, float('inf')), **kwargs) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 6e46903..cd761c9 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -43,15 +43,16 @@ async def consume_asyncgen( to_trio.send_nowait(item) -async def run_task( +def _run_asyncio_task( func: Callable, *, - qsize: int = 2**10, + qsize: int = 1, _treat_as_stream: bool = False, **kwargs, ) -> Any: """Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. + """ assert current_actor().is_infected_aio() @@ -59,29 +60,38 @@ async def run_task( from_trio = asyncio.Queue(qsize) # type: ignore to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore + from_aio._err = None + args = tuple(inspect.getfullargspec(func).args) if getattr(func, '_tractor_steam_function', None): # the assumption is that the target async routine accepts the # send channel then it intends to yield more then one return # value otherwise it would just return ;P - _treat_as_stream = True + # _treat_as_stream = True + assert qsize > 1 # allow target func to accept/stream results manually by name if 'to_trio' in args: kwargs['to_trio'] = to_trio + if 'from_trio' in args: kwargs['from_trio'] = from_trio + # if 'from_aio' in args: + # kwargs['from_aio'] = from_aio + coro = func(**kwargs) - cancel_scope = trio.CancelScope() + # cancel_scope = trio.CancelScope() # start the asyncio task we submitted from trio if inspect.isawaitable(coro): task = asyncio.create_task(run_coro(to_trio, coro)) + elif inspect.isasyncgen(coro): task = asyncio.create_task(consume_asyncgen(to_trio, coro)) + else: raise TypeError(f"No support for invoking {coro}") @@ -96,54 +106,149 @@ async def run_task( if aio_err: log.exception(f"asyncio task errorred:\n{aio_err}") - cancel_scope.cancel() + # cancel_scope.cancel() + from_aio._err = aio_err + to_trio.close() task.add_done_callback(cancel_trio) + return task, from_aio, to_trio + + +async def run_task( + func: Callable, + *, + qsize: int = 2**10, + _treat_as_stream: bool = False, + **kwargs, +) -> Any: + """Run an ``asyncio`` async function or generator in a task, return + or stream the result back to ``trio``. + + """ + # assert current_actor().is_infected_aio() + + # # ITC (inter task comms) + # from_trio = asyncio.Queue(qsize) # type: ignore + # to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore + + # args = tuple(inspect.getfullargspec(func).args) + + # if getattr(func, '_tractor_steam_function', None): + # # the assumption is that the target async routine accepts the + # # send channel then it intends to yield more then one return + # # value otherwise it would just return ;P + # _treat_as_stream = True + + # # allow target func to accept/stream results manually by name + # if 'to_trio' in args: + # kwargs['to_trio'] = to_trio + # if 'from_trio' in args: + # kwargs['from_trio'] = from_trio + + # coro = func(**kwargs) + + # cancel_scope = trio.CancelScope() + + # # start the asyncio task we submitted from trio + # if inspect.isawaitable(coro): + # task = asyncio.create_task(run_coro(to_trio, coro)) + + # elif inspect.isasyncgen(coro): + # task = asyncio.create_task(consume_asyncgen(to_trio, coro)) + + # else: + # raise TypeError(f"No support for invoking {coro}") + + # aio_err = None + + # def cancel_trio(task): + # """Cancel the calling ``trio`` task on error. + # """ + # nonlocal aio_err + # aio_err = task.exception() + + # if aio_err: + # log.exception(f"asyncio task errorred:\n{aio_err}") + + # cancel_scope.cancel() + + # task.add_done_callback(cancel_trio) + # async iterator - if inspect.isasyncgen(coro) or _treat_as_stream: + # if inspect.isasyncgen(coro) or _treat_as_stream: - async def stream_results(): - try: - with cancel_scope: - # stream values upward - async with from_aio: - async for item in from_aio: - yield item + # if inspect.isasyncgenfunction(meth) or : + if _treat_as_stream: - if cancel_scope.cancelled_caught: - # always raise from any captured asyncio error - if aio_err: - raise aio_err + task, from_aio, to_trio = _run_asyncio_task( + func, + qsize=2**8, + **kwargs, + ) - except BaseException as err: - if aio_err is not None: - # always raise from any captured asyncio error - raise err from aio_err - else: - raise + return from_aio - return stream_results() + # async def stream_results(): + # try: + # with cancel_scope: + # # stream values upward + # async with from_aio: + # async for item in from_aio: + # yield item + + # if cancel_scope.cancelled_caught: + # # always raise from any captured asyncio error + # if aio_err: + # raise aio_err + + # except BaseException as err: + # if aio_err is not None: + # # always raise from any captured asyncio error + # raise err from aio_err + # else: + # raise + # finally: + # # breakpoint() + # task.cancel() + + # return stream_results() # simple async func try: - with cancel_scope: - # return single value - return await from_aio.receive() + task, from_aio, to_trio = _run_asyncio_task( + func, + qsize=1, + **kwargs, + ) - if cancel_scope.cancelled_caught: - # always raise from any captured asyncio error - if aio_err: - raise aio_err + # with cancel_scope: + # async with from_aio: + # return single value + return await from_aio.receive() + + # if cancel_scope.cancelled_caught: + # # always raise from any captured asyncio error + # if aio_err: + # raise aio_err # Do we need this? except BaseException as err: + # await tractor.breakpoint() + aio_err = from_aio._err if aio_err is not None: # always raise from any captured asyncio error raise err from aio_err else: raise + finally: + task.cancel() + + +# async def stream_from_task +# pass + def run_as_asyncio_guest( trio_main: Callable,