forked from goodboy/tractor
Fix error propagation on asyncio streaming tasks
parent
a4859c969c
commit
2bd5ba76b9
|
@ -83,7 +83,7 @@ def _run_asyncio_task(
|
|||
|
||||
coro = func(**kwargs)
|
||||
|
||||
# cancel_scope = trio.CancelScope()
|
||||
cancel_scope = trio.CancelScope()
|
||||
|
||||
# start the asyncio task we submitted from trio
|
||||
if inspect.isawaitable(coro):
|
||||
|
@ -109,12 +109,13 @@ def _run_asyncio_task(
|
|||
if aio_err:
|
||||
log.exception(f"asyncio task errorred:\n{aio_err}")
|
||||
|
||||
# cancel_scope.cancel()
|
||||
cancel_scope.cancel()
|
||||
from_aio._err = aio_err
|
||||
from_aio.close()
|
||||
|
||||
task.add_done_callback(cancel_trio)
|
||||
|
||||
return task, from_aio, to_trio
|
||||
return task, from_aio, to_trio, cancel_scope
|
||||
|
||||
|
||||
async def run_task(
|
||||
|
@ -183,7 +184,7 @@ async def run_task(
|
|||
# if inspect.isasyncgenfunction(meth) or :
|
||||
if _treat_as_stream:
|
||||
|
||||
task, from_aio, to_trio = _run_asyncio_task(
|
||||
task, from_aio, to_trio, cs = _run_asyncio_task(
|
||||
func,
|
||||
qsize=2**8,
|
||||
**kwargs,
|
||||
|
@ -218,7 +219,7 @@ async def run_task(
|
|||
|
||||
# simple async func
|
||||
try:
|
||||
task, from_aio, to_trio = _run_asyncio_task(
|
||||
task, from_aio, to_trio, cs = _run_asyncio_task(
|
||||
func,
|
||||
qsize=1,
|
||||
**kwargs,
|
||||
|
@ -227,12 +228,13 @@ async def run_task(
|
|||
# with cancel_scope:
|
||||
# async with from_aio:
|
||||
# return single value
|
||||
return await from_aio.receive()
|
||||
with cs:
|
||||
return await from_aio.receive()
|
||||
|
||||
# if cancel_scope.cancelled_caught:
|
||||
# # always raise from any captured asyncio error
|
||||
# if aio_err:
|
||||
# raise aio_err
|
||||
if cs.cancelled_caught:
|
||||
# always raise from any captured asyncio error
|
||||
if from_aio._err:
|
||||
raise from_aio._err
|
||||
|
||||
# Do we need this?
|
||||
except Exception as err:
|
||||
|
|
Loading…
Reference in New Issue