diff --git a/tractor/_root.py b/tractor/_root.py index 3468a25..797e736 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -103,7 +103,6 @@ async def open_root_actor( _default_arbiter_port, ) - if loglevel is None: loglevel = log.get_loglevel() else: diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 6e46903..cd761c9 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -43,15 +43,16 @@ async def consume_asyncgen( to_trio.send_nowait(item) -async def run_task( +def _run_asyncio_task( func: Callable, *, - qsize: int = 2**10, + qsize: int = 1, _treat_as_stream: bool = False, **kwargs, ) -> Any: """Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. + """ assert current_actor().is_infected_aio() @@ -59,29 +60,38 @@ async def run_task( from_trio = asyncio.Queue(qsize) # type: ignore to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore + from_aio._err = None + args = tuple(inspect.getfullargspec(func).args) if getattr(func, '_tractor_steam_function', None): # the assumption is that the target async routine accepts the # send channel then it intends to yield more then one return # value otherwise it would just return ;P - _treat_as_stream = True + # _treat_as_stream = True + assert qsize > 1 # allow target func to accept/stream results manually by name if 'to_trio' in args: kwargs['to_trio'] = to_trio + if 'from_trio' in args: kwargs['from_trio'] = from_trio + # if 'from_aio' in args: + # kwargs['from_aio'] = from_aio + coro = func(**kwargs) - cancel_scope = trio.CancelScope() + # cancel_scope = trio.CancelScope() # start the asyncio task we submitted from trio if inspect.isawaitable(coro): task = asyncio.create_task(run_coro(to_trio, coro)) + elif inspect.isasyncgen(coro): task = asyncio.create_task(consume_asyncgen(to_trio, coro)) + else: raise TypeError(f"No support for invoking {coro}") @@ -96,54 +106,149 @@ async def run_task( if aio_err: log.exception(f"asyncio task errorred:\n{aio_err}") - cancel_scope.cancel() + # cancel_scope.cancel() + from_aio._err = aio_err + to_trio.close() task.add_done_callback(cancel_trio) + return task, from_aio, to_trio + + +async def run_task( + func: Callable, + *, + qsize: int = 2**10, + _treat_as_stream: bool = False, + **kwargs, +) -> Any: + """Run an ``asyncio`` async function or generator in a task, return + or stream the result back to ``trio``. + + """ + # assert current_actor().is_infected_aio() + + # # ITC (inter task comms) + # from_trio = asyncio.Queue(qsize) # type: ignore + # to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore + + # args = tuple(inspect.getfullargspec(func).args) + + # if getattr(func, '_tractor_steam_function', None): + # # the assumption is that the target async routine accepts the + # # send channel then it intends to yield more then one return + # # value otherwise it would just return ;P + # _treat_as_stream = True + + # # allow target func to accept/stream results manually by name + # if 'to_trio' in args: + # kwargs['to_trio'] = to_trio + # if 'from_trio' in args: + # kwargs['from_trio'] = from_trio + + # coro = func(**kwargs) + + # cancel_scope = trio.CancelScope() + + # # start the asyncio task we submitted from trio + # if inspect.isawaitable(coro): + # task = asyncio.create_task(run_coro(to_trio, coro)) + + # elif inspect.isasyncgen(coro): + # task = asyncio.create_task(consume_asyncgen(to_trio, coro)) + + # else: + # raise TypeError(f"No support for invoking {coro}") + + # aio_err = None + + # def cancel_trio(task): + # """Cancel the calling ``trio`` task on error. + # """ + # nonlocal aio_err + # aio_err = task.exception() + + # if aio_err: + # log.exception(f"asyncio task errorred:\n{aio_err}") + + # cancel_scope.cancel() + + # task.add_done_callback(cancel_trio) + # async iterator - if inspect.isasyncgen(coro) or _treat_as_stream: + # if inspect.isasyncgen(coro) or _treat_as_stream: - async def stream_results(): - try: - with cancel_scope: - # stream values upward - async with from_aio: - async for item in from_aio: - yield item + # if inspect.isasyncgenfunction(meth) or : + if _treat_as_stream: - if cancel_scope.cancelled_caught: - # always raise from any captured asyncio error - if aio_err: - raise aio_err + task, from_aio, to_trio = _run_asyncio_task( + func, + qsize=2**8, + **kwargs, + ) - except BaseException as err: - if aio_err is not None: - # always raise from any captured asyncio error - raise err from aio_err - else: - raise + return from_aio - return stream_results() + # async def stream_results(): + # try: + # with cancel_scope: + # # stream values upward + # async with from_aio: + # async for item in from_aio: + # yield item + + # if cancel_scope.cancelled_caught: + # # always raise from any captured asyncio error + # if aio_err: + # raise aio_err + + # except BaseException as err: + # if aio_err is not None: + # # always raise from any captured asyncio error + # raise err from aio_err + # else: + # raise + # finally: + # # breakpoint() + # task.cancel() + + # return stream_results() # simple async func try: - with cancel_scope: - # return single value - return await from_aio.receive() + task, from_aio, to_trio = _run_asyncio_task( + func, + qsize=1, + **kwargs, + ) - if cancel_scope.cancelled_caught: - # always raise from any captured asyncio error - if aio_err: - raise aio_err + # with cancel_scope: + # async with from_aio: + # return single value + return await from_aio.receive() + + # if cancel_scope.cancelled_caught: + # # always raise from any captured asyncio error + # if aio_err: + # raise aio_err # Do we need this? except BaseException as err: + # await tractor.breakpoint() + aio_err = from_aio._err if aio_err is not None: # always raise from any captured asyncio error raise err from aio_err else: raise + finally: + task.cancel() + + +# async def stream_from_task +# pass + def run_as_asyncio_guest( trio_main: Callable,