From 68e5c2a95f07c5d4d2c33b69528a02b89a099500 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 14 Oct 2020 12:51:41 -0400 Subject: [PATCH] Raise from asyncio error; fixes mypy --- tractor/to_asyncio.py | 50 +++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 59db280..85600d7 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -6,7 +6,7 @@ import inspect from typing import ( Any, Callable, - AsyncGenerator, + AsyncIterator, Awaitable, Union, ) @@ -33,7 +33,7 @@ async def run_coro( async def consume_asyncgen( to_trio: trio.MemorySendChannel, - coro: AsyncGenerator, + coro: AsyncIterator, ) -> None: """Stream async generator results back to ``trio``. @@ -50,7 +50,7 @@ async def run_task( qsize: int = 2**10, _treat_as_stream: bool = False, **kwargs, -) -> Union[AsyncGenerator, Any]: +) -> Any: """Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. """ @@ -79,50 +79,58 @@ async def run_task( cancel_scope = trio.CancelScope() # start the asyncio task we submitted from trio - # TODO: try out ``anyio`` asyncio based tg here if inspect.isawaitable(coro): task = asyncio.create_task(run_coro(to_trio, coro)) elif inspect.isasyncgen(coro): task = asyncio.create_task(consume_asyncgen(to_trio, coro)) else: - raise TypeError(f"No support for {coro}") + raise TypeError(f"No support for invoking {coro}") - err = None + aio_err = None def cancel_trio(task): """Cancel the calling ``trio`` task on error. """ nonlocal err - err = task.exception() - if err: - log.exception(f"asyncio task errorred:\n{err}") + aio_err = task.exception() + if aio_err: + log.exception(f"asyncio task errorred:\n{aio_err}") cancel_scope.cancel() task.add_done_callback(cancel_trio) - # asycn gen + # async iterator if inspect.isasyncgen(coro) or _treat_as_stream: async def stream_results(): - with cancel_scope: - # stream values upward - async with from_aio: - async for item in from_aio: - yield item - if cancel_scope.cancelled_caught and err: - raise err + try: + with cancel_scope: + # stream values upward + async with from_aio: + async for item in from_aio: + yield item + except BaseException as err: + if aio_err is not None: + # always raise from any captured asyncio error + raise err from aio_err + else: + raise return stream_results() # simple async func - elif inspect.iscoroutine(coro): - + try: with cancel_scope: # return single value return await from_aio.receive() - if cancel_scope.cancelled_caught and err: - raise err + # Do we need this? + except BaseException as err: + if aio_err is not None: + # always raise from any captured asyncio error + raise err from aio_err + else: + raise def run_as_asyncio_guest(