Compare commits

..

No commits in common. "e1bacaf4f4223d40e9b6869fdc9138ac0773f6f9" and "55809dac51ca9f1f343468dfa4c4e8369fc7e6bf" have entirely different histories.

5 changed files with 123 additions and 137 deletions

View File

@ -491,13 +491,7 @@ async def stream_from_aio(
], ],
): ):
async for value in chan: async for value in chan:
print(f'trio received: {value!r}') print(f'trio received {value}')
# XXX, debugging EoC not being handled correctly
# in `transate_aio_errors()`..
# if value is None:
# await tractor.pause(shield=True)
pulled.append(value) pulled.append(value)
if value == 50: if value == 50:
@ -739,13 +733,7 @@ async def aio_echo_server(
to_trio.send_nowait('start') to_trio.send_nowait('start')
while True: while True:
try: msg = await from_trio.get()
msg = await from_trio.get()
except to_asyncio.TrioTaskExited:
print(
'breaking aio echo loop due to `trio` exit!'
)
break
# echo the msg back # echo the msg back
to_trio.send_nowait(msg) to_trio.send_nowait(msg)

View File

@ -39,7 +39,7 @@ def test_infected_root_actor(
''' '''
async def _trio_main(): async def _trio_main():
with trio.fail_after(2 if not debug_mode else 999): with trio.fail_after(2):
first: str first: str
chan: to_asyncio.LinkedTaskChannel chan: to_asyncio.LinkedTaskChannel
async with ( async with (
@ -59,11 +59,7 @@ def test_infected_root_actor(
assert out == i assert out == i
print(f'asyncio echoing {i}') print(f'asyncio echoing {i}')
if ( if raise_error_mid_stream and i == 500:
raise_error_mid_stream
and
i == 500
):
raise raise_error_mid_stream raise raise_error_mid_stream
if out is None: if out is None:

View File

@ -64,9 +64,7 @@ def test_stashed_child_nursery(use_start_soon):
async def main(): async def main():
async with ( async with (
trio.open_nursery( trio.open_nursery() as pn,
strict_exception_groups=False,
) as pn,
): ):
cn = await pn.start(mk_child_nursery) cn = await pn.start(mk_child_nursery)
assert cn assert cn

View File

@ -2287,13 +2287,6 @@ def _set_trace(
repl.set_trace(frame=caller_frame) repl.set_trace(frame=caller_frame)
# XXX TODO! XXX, ensure `pytest -s` doesn't just
# hang on this being called in a test.. XD
# -[ ] maybe something in our test suite or is there
# some way we can detect output capture is enabled
# from the process itself?
# |_ronny: ?
#
async def pause( async def pause(
*, *,
hide_tb: bool = True, hide_tb: bool = True,
@ -3201,15 +3194,6 @@ async def maybe_wait_for_debugger(
return False return False
class BoxedMaybeException(Struct):
'''
Box a maybe-exception for post-crash introspection usage
from the body of a `open_crash_handler()` scope.
'''
value: BaseException|None = None
# TODO: better naming and what additionals? # TODO: better naming and what additionals?
# - [ ] optional runtime plugging? # - [ ] optional runtime plugging?
# - [ ] detection for sync vs. async code? # - [ ] detection for sync vs. async code?
@ -3240,6 +3224,9 @@ def open_crash_handler(
''' '''
__tracebackhide__: bool = tb_hide __tracebackhide__: bool = tb_hide
class BoxedMaybeException(Struct):
value: BaseException|None = None
# TODO, yield a `outcome.Error`-like boxed type? # TODO, yield a `outcome.Error`-like boxed type?
# -[~] use `outcome.Value/Error` X-> frozen! # -[~] use `outcome.Value/Error` X-> frozen!
# -[x] write our own..? # -[x] write our own..?
@ -3281,8 +3268,6 @@ def open_crash_handler(
def maybe_open_crash_handler( def maybe_open_crash_handler(
pdb: bool = False, pdb: bool = False,
tb_hide: bool = True, tb_hide: bool = True,
**kwargs,
): ):
''' '''
Same as `open_crash_handler()` but with bool input flag Same as `open_crash_handler()` but with bool input flag
@ -3293,11 +3278,9 @@ def maybe_open_crash_handler(
''' '''
__tracebackhide__: bool = tb_hide __tracebackhide__: bool = tb_hide
rtctx = nullcontext( rtctx = nullcontext
enter_result=BoxedMaybeException()
)
if pdb: if pdb:
rtctx = open_crash_handler(**kwargs) rtctx = open_crash_handler
with rtctx as boxed_maybe_exc: with rtctx():
yield boxed_maybe_exc yield

View File

@ -348,6 +348,7 @@ def _run_asyncio_task(
trio_task: trio.Task = trio.lowlevel.current_task() trio_task: trio.Task = trio.lowlevel.current_task()
trio_cs = trio.CancelScope() trio_cs = trio.CancelScope()
aio_task_complete = trio.Event() aio_task_complete = trio.Event()
aio_err: BaseException|None = None
chan = LinkedTaskChannel( chan = LinkedTaskChannel(
_to_aio=aio_q, # asyncio.Queue _to_aio=aio_q, # asyncio.Queue
@ -391,7 +392,7 @@ def _run_asyncio_task(
if ( if (
result != orig result != orig
and and
chan._aio_err is None aio_err is None
and and
# in the `open_channel_from()` case we don't # in the `open_channel_from()` case we don't
@ -428,7 +429,8 @@ def _run_asyncio_task(
not chan._aio_err not chan._aio_err
): ):
chan._trio_to_raise = AsyncioTaskExited( chan._trio_to_raise = AsyncioTaskExited(
f'Task exited with final result: {result!r}\n' f'Task existed with final result\n'
f'{result!r}\n'
) )
# only close the sender side which will relay # only close the sender side which will relay
@ -740,6 +742,7 @@ async def translate_aio_errors(
aio_done_before_trio: bool = aio_task.done() aio_done_before_trio: bool = aio_task.done()
assert aio_task assert aio_task
trio_err: BaseException|None = None trio_err: BaseException|None = None
to_raise_trio: BaseException|None = None
try: try:
yield # back to one of the cross-loop apis yield # back to one of the cross-loop apis
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
@ -775,9 +778,8 @@ async def translate_aio_errors(
# called from `LinkedTaskChannel.receive()` which we want # called from `LinkedTaskChannel.receive()` which we want
# passthrough and further we have no special meaning for it in # passthrough and further we have no special meaning for it in
# terms of relaying errors or signals from the aio side! # terms of relaying errors or signals from the aio side!
except trio.EndOfChannel as eoc: except trio.EndOfChannel:
trio_err = chan._trio_err = eoc raise
raise eoc
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
# task-done-callback. # task-done-callback.
@ -823,7 +825,7 @@ async def translate_aio_errors(
raise cre raise cre
except BaseException as _trio_err: except BaseException as _trio_err:
trio_err = chan._trio_err = _trio_err trio_err = chan._trio_err = trio_err
# await tractor.pause(shield=True) # workx! # await tractor.pause(shield=True) # workx!
entered: bool = await _debug._maybe_enter_pm( entered: bool = await _debug._maybe_enter_pm(
trio_err, trio_err,
@ -864,27 +866,25 @@ async def translate_aio_errors(
f'The `trio`-side task crashed!\n' f'The `trio`-side task crashed!\n'
f'{trio_err}' f'{trio_err}'
) )
# ??TODO? move this into the func that tries to use aio_task.set_exception(aio_taskc)
# `Task._fut_waiter: Future` instead?? wait_on_aio_task = False
# # try:
# aio_task.set_exception(aio_taskc) # aio_task.set_exception(aio_taskc)
# wait_on_aio_task = False # except (
try: # asyncio.InvalidStateError,
aio_task.set_exception(aio_taskc) # RuntimeError,
except ( # # ^XXX, uhh bc apparently we can't use `.set_exception()`
asyncio.InvalidStateError, # # any more XD .. ??
RuntimeError, # ):
# ^XXX, uhh bc apparently we can't use `.set_exception()` # wait_on_aio_task = False
# any more XD .. ??
):
wait_on_aio_task = False
finally: finally:
# record wtv `trio`-side error transpired # record wtv `trio`-side error transpired
if trio_err: if trio_err:
assert chan._trio_err is trio_err if chan._trio_err is not trio_err:
# if chan._trio_err is not trio_err: await tractor.pause(shield=True)
# await tractor.pause(shield=True)
# assert chan._trio_err is trio_err
ya_trio_exited: bool = chan._trio_exited ya_trio_exited: bool = chan._trio_exited
graceful_trio_exit: bool = ( graceful_trio_exit: bool = (
@ -1032,83 +1032,104 @@ async def translate_aio_errors(
'asyncio-task is done and unblocked trio-side!\n' 'asyncio-task is done and unblocked trio-side!\n'
) )
# NOTE, was a `maybe_raise_aio_side_err()` closure that # TODO?
# i moved inline BP # -[ ] make this a channel method, OR
''' # -[ ] just put back inline below?
Raise any `trio`-side-caused cancellation or legit task #
error normally propagated from the caller of either, # await tractor.pause(shield=True)
- `open_channel_from()` # TODO, go back to inlining this..
- `run_task()` def maybe_raise_aio_side_err(
trio_err: Exception,
) -> None:
'''
Raise any `trio`-side-caused cancellation or legit task
error normally propagated from the caller of either,
- `open_channel_from()`
- `run_task()`
''' '''
aio_err: BaseException|None = chan._aio_err aio_err: BaseException|None = chan._aio_err
trio_to_raise: ( trio_to_raise: (
AsyncioCancelled| AsyncioCancelled|
AsyncioTaskExited| AsyncioTaskExited|
None None
) = chan._trio_to_raise ) = chan._trio_to_raise
if not suppress_graceful_exits: if not suppress_graceful_exits:
raise trio_to_raise from (aio_err or trio_err) raise trio_to_raise from (aio_err or trio_err)
if trio_to_raise: if trio_to_raise:
match ( # import pdbp; pdbp.set_trace()
trio_to_raise, match (
trio_err, trio_to_raise,
): trio_err,
case (
AsyncioTaskExited(),
trio.Cancelled()|
None,
): ):
log.info( case (
'Ignoring aio exit signal since trio also exited!' AsyncioTaskExited(),
) trio.Cancelled()|None,
return ):
case (
AsyncioTaskExited(),
trio.EndOfChannel(),
):
raise trio_err
case (
AsyncioCancelled(),
trio.Cancelled(),
):
if not aio_done_before_trio:
log.info( log.info(
'Ignoring aio cancelled signal since trio was also cancelled!' 'Ignoring aio exit signal since trio also exited!'
) )
return return
case _:
raise trio_to_raise from (aio_err or trio_err)
# Check if the asyncio-side is the cause of the trio-side case (
# error. AsyncioCancelled(),
elif ( trio.Cancelled(),
aio_err is not None ):
and if not aio_done_before_trio:
type(aio_err) is not AsyncioCancelled log.info(
): 'Ignoring aio cancelled signal since trio was also cancelled!'
# always raise from any captured asyncio error )
return
case _:
raise trio_to_raise from (aio_err or trio_err)
# Check if the asyncio-side is the cause of the trio-side
# error.
elif (
aio_err is not None
and
type(aio_err) is not AsyncioCancelled
# and (
# type(aio_err) is not AsyncioTaskExited
# and
# not ya_trio_exited
# and
# not trio_err
# )
# TODO, case where trio_err is not None and
# aio_err is AsyncioTaskExited => raise eg!
# -[ ] maybe use a match bc this get's real
# complex fast XD
#
# or
# type(aio_err) is not AsyncioTaskExited
# and
# trio_err
# )
):
# always raise from any captured asyncio error
if trio_err:
raise trio_err from aio_err
# XXX NOTE! above in the `trio.ClosedResourceError`
# handler we specifically set the
# `aio_err = AsyncioCancelled` such that it is raised
# as that special exc here!
raise aio_err
if trio_err: if trio_err:
raise trio_err from aio_err raise trio_err
# XXX NOTE! above in the `trio.ClosedResourceError` # await tractor.pause()
# handler we specifically set the # NOTE: if any ``asyncio`` error was caught, raise it here inline
# `aio_err = AsyncioCancelled` such that it is raised # here in the ``trio`` task
# as that special exc here! # if trio_err:
raise aio_err maybe_raise_aio_side_err(
trio_err=to_raise_trio or trio_err
if trio_err: )
raise trio_err
# ^^TODO?? case where trio_err is not None and
# aio_err is AsyncioTaskExited => raise eg!
# -[x] maybe use a match bc this get's real
# complex fast XD
# => i did this above for silent exit cases ya?
async def run_task( async def run_task(