diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index ef7d595..92320a0 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -83,7 +83,7 @@ def _run_asyncio_task( coro = func(**kwargs) - # cancel_scope = trio.CancelScope() + cancel_scope = trio.CancelScope() # start the asyncio task we submitted from trio if inspect.isawaitable(coro): @@ -109,12 +109,13 @@ def _run_asyncio_task( if aio_err: log.exception(f"asyncio task errorred:\n{aio_err}") - # cancel_scope.cancel() + cancel_scope.cancel() from_aio._err = aio_err + from_aio.close() task.add_done_callback(cancel_trio) - return task, from_aio, to_trio + return task, from_aio, to_trio, cancel_scope async def run_task( @@ -183,7 +184,7 @@ async def run_task( # if inspect.isasyncgenfunction(meth) or : if _treat_as_stream: - task, from_aio, to_trio = _run_asyncio_task( + task, from_aio, to_trio, cs = _run_asyncio_task( func, qsize=2**8, **kwargs, @@ -218,7 +219,7 @@ async def run_task( # simple async func try: - task, from_aio, to_trio = _run_asyncio_task( + task, from_aio, to_trio, cs = _run_asyncio_task( func, qsize=1, **kwargs, @@ -227,12 +228,13 @@ async def run_task( # with cancel_scope: # async with from_aio: # return single value - return await from_aio.receive() + with cs: + return await from_aio.receive() - # if cancel_scope.cancelled_caught: - # # always raise from any captured asyncio error - # if aio_err: - # raise aio_err + if cs.cancelled_caught: + # always raise from any captured asyncio error + if from_aio._err: + raise from_aio._err # Do we need this? except Exception as err: