forked from goodboy/tractor
1
0
Fork 0

Fix error propagation on asyncio streaming tasks

infect_asyncio
Tyler Goodlet 2021-07-28 12:32:46 -04:00
parent 55e210fec6
commit 325c0cdb1b
1 changed files with 12 additions and 10 deletions

View File

@ -83,7 +83,7 @@ def _run_asyncio_task(
coro = func(**kwargs)
# cancel_scope = trio.CancelScope()
cancel_scope = trio.CancelScope()
# start the asyncio task we submitted from trio
if inspect.isawaitable(coro):
@ -109,12 +109,13 @@ def _run_asyncio_task(
if aio_err:
log.exception(f"asyncio task errorred:\n{aio_err}")
# cancel_scope.cancel()
cancel_scope.cancel()
from_aio._err = aio_err
from_aio.close()
task.add_done_callback(cancel_trio)
return task, from_aio, to_trio
return task, from_aio, to_trio, cancel_scope
async def run_task(
@ -183,7 +184,7 @@ async def run_task(
# if inspect.isasyncgenfunction(meth) or :
if _treat_as_stream:
task, from_aio, to_trio = _run_asyncio_task(
task, from_aio, to_trio, cs = _run_asyncio_task(
func,
qsize=2**8,
**kwargs,
@ -218,7 +219,7 @@ async def run_task(
# simple async func
try:
task, from_aio, to_trio = _run_asyncio_task(
task, from_aio, to_trio, cs = _run_asyncio_task(
func,
qsize=1,
**kwargs,
@ -227,12 +228,13 @@ async def run_task(
# with cancel_scope:
# async with from_aio:
# return single value
with cs:
return await from_aio.receive()
# if cancel_scope.cancelled_caught:
# # always raise from any captured asyncio error
# if aio_err:
# raise aio_err
if cs.cancelled_caught:
# always raise from any captured asyncio error
if from_aio._err:
raise from_aio._err
# Do we need this?
except Exception as err: