Compare commits
No commits in common. "e1bacaf4f4223d40e9b6869fdc9138ac0773f6f9" and "55809dac51ca9f1f343468dfa4c4e8369fc7e6bf" have entirely different histories.
e1bacaf4f4
...
55809dac51
|
@ -491,13 +491,7 @@ async def stream_from_aio(
|
||||||
],
|
],
|
||||||
):
|
):
|
||||||
async for value in chan:
|
async for value in chan:
|
||||||
print(f'trio received: {value!r}')
|
print(f'trio received {value}')
|
||||||
|
|
||||||
# XXX, debugging EoC not being handled correctly
|
|
||||||
# in `transate_aio_errors()`..
|
|
||||||
# if value is None:
|
|
||||||
# await tractor.pause(shield=True)
|
|
||||||
|
|
||||||
pulled.append(value)
|
pulled.append(value)
|
||||||
|
|
||||||
if value == 50:
|
if value == 50:
|
||||||
|
@ -739,13 +733,7 @@ async def aio_echo_server(
|
||||||
to_trio.send_nowait('start')
|
to_trio.send_nowait('start')
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
msg = await from_trio.get()
|
||||||
msg = await from_trio.get()
|
|
||||||
except to_asyncio.TrioTaskExited:
|
|
||||||
print(
|
|
||||||
'breaking aio echo loop due to `trio` exit!'
|
|
||||||
)
|
|
||||||
break
|
|
||||||
|
|
||||||
# echo the msg back
|
# echo the msg back
|
||||||
to_trio.send_nowait(msg)
|
to_trio.send_nowait(msg)
|
||||||
|
|
|
@ -39,7 +39,7 @@ def test_infected_root_actor(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def _trio_main():
|
async def _trio_main():
|
||||||
with trio.fail_after(2 if not debug_mode else 999):
|
with trio.fail_after(2):
|
||||||
first: str
|
first: str
|
||||||
chan: to_asyncio.LinkedTaskChannel
|
chan: to_asyncio.LinkedTaskChannel
|
||||||
async with (
|
async with (
|
||||||
|
@ -59,11 +59,7 @@ def test_infected_root_actor(
|
||||||
assert out == i
|
assert out == i
|
||||||
print(f'asyncio echoing {i}')
|
print(f'asyncio echoing {i}')
|
||||||
|
|
||||||
if (
|
if raise_error_mid_stream and i == 500:
|
||||||
raise_error_mid_stream
|
|
||||||
and
|
|
||||||
i == 500
|
|
||||||
):
|
|
||||||
raise raise_error_mid_stream
|
raise raise_error_mid_stream
|
||||||
|
|
||||||
if out is None:
|
if out is None:
|
||||||
|
|
|
@ -64,9 +64,7 @@ def test_stashed_child_nursery(use_start_soon):
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery(
|
trio.open_nursery() as pn,
|
||||||
strict_exception_groups=False,
|
|
||||||
) as pn,
|
|
||||||
):
|
):
|
||||||
cn = await pn.start(mk_child_nursery)
|
cn = await pn.start(mk_child_nursery)
|
||||||
assert cn
|
assert cn
|
||||||
|
|
|
@ -2287,13 +2287,6 @@ def _set_trace(
|
||||||
repl.set_trace(frame=caller_frame)
|
repl.set_trace(frame=caller_frame)
|
||||||
|
|
||||||
|
|
||||||
# XXX TODO! XXX, ensure `pytest -s` doesn't just
|
|
||||||
# hang on this being called in a test.. XD
|
|
||||||
# -[ ] maybe something in our test suite or is there
|
|
||||||
# some way we can detect output capture is enabled
|
|
||||||
# from the process itself?
|
|
||||||
# |_ronny: ?
|
|
||||||
#
|
|
||||||
async def pause(
|
async def pause(
|
||||||
*,
|
*,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
@ -3201,15 +3194,6 @@ async def maybe_wait_for_debugger(
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
class BoxedMaybeException(Struct):
|
|
||||||
'''
|
|
||||||
Box a maybe-exception for post-crash introspection usage
|
|
||||||
from the body of a `open_crash_handler()` scope.
|
|
||||||
|
|
||||||
'''
|
|
||||||
value: BaseException|None = None
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: better naming and what additionals?
|
# TODO: better naming and what additionals?
|
||||||
# - [ ] optional runtime plugging?
|
# - [ ] optional runtime plugging?
|
||||||
# - [ ] detection for sync vs. async code?
|
# - [ ] detection for sync vs. async code?
|
||||||
|
@ -3240,6 +3224,9 @@ def open_crash_handler(
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = tb_hide
|
__tracebackhide__: bool = tb_hide
|
||||||
|
|
||||||
|
class BoxedMaybeException(Struct):
|
||||||
|
value: BaseException|None = None
|
||||||
|
|
||||||
# TODO, yield a `outcome.Error`-like boxed type?
|
# TODO, yield a `outcome.Error`-like boxed type?
|
||||||
# -[~] use `outcome.Value/Error` X-> frozen!
|
# -[~] use `outcome.Value/Error` X-> frozen!
|
||||||
# -[x] write our own..?
|
# -[x] write our own..?
|
||||||
|
@ -3281,8 +3268,6 @@ def open_crash_handler(
|
||||||
def maybe_open_crash_handler(
|
def maybe_open_crash_handler(
|
||||||
pdb: bool = False,
|
pdb: bool = False,
|
||||||
tb_hide: bool = True,
|
tb_hide: bool = True,
|
||||||
|
|
||||||
**kwargs,
|
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Same as `open_crash_handler()` but with bool input flag
|
Same as `open_crash_handler()` but with bool input flag
|
||||||
|
@ -3293,11 +3278,9 @@ def maybe_open_crash_handler(
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = tb_hide
|
__tracebackhide__: bool = tb_hide
|
||||||
|
|
||||||
rtctx = nullcontext(
|
rtctx = nullcontext
|
||||||
enter_result=BoxedMaybeException()
|
|
||||||
)
|
|
||||||
if pdb:
|
if pdb:
|
||||||
rtctx = open_crash_handler(**kwargs)
|
rtctx = open_crash_handler
|
||||||
|
|
||||||
with rtctx as boxed_maybe_exc:
|
with rtctx():
|
||||||
yield boxed_maybe_exc
|
yield
|
||||||
|
|
|
@ -348,6 +348,7 @@ def _run_asyncio_task(
|
||||||
trio_task: trio.Task = trio.lowlevel.current_task()
|
trio_task: trio.Task = trio.lowlevel.current_task()
|
||||||
trio_cs = trio.CancelScope()
|
trio_cs = trio.CancelScope()
|
||||||
aio_task_complete = trio.Event()
|
aio_task_complete = trio.Event()
|
||||||
|
aio_err: BaseException|None = None
|
||||||
|
|
||||||
chan = LinkedTaskChannel(
|
chan = LinkedTaskChannel(
|
||||||
_to_aio=aio_q, # asyncio.Queue
|
_to_aio=aio_q, # asyncio.Queue
|
||||||
|
@ -391,7 +392,7 @@ def _run_asyncio_task(
|
||||||
if (
|
if (
|
||||||
result != orig
|
result != orig
|
||||||
and
|
and
|
||||||
chan._aio_err is None
|
aio_err is None
|
||||||
and
|
and
|
||||||
|
|
||||||
# in the `open_channel_from()` case we don't
|
# in the `open_channel_from()` case we don't
|
||||||
|
@ -428,7 +429,8 @@ def _run_asyncio_task(
|
||||||
not chan._aio_err
|
not chan._aio_err
|
||||||
):
|
):
|
||||||
chan._trio_to_raise = AsyncioTaskExited(
|
chan._trio_to_raise = AsyncioTaskExited(
|
||||||
f'Task exited with final result: {result!r}\n'
|
f'Task existed with final result\n'
|
||||||
|
f'{result!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# only close the sender side which will relay
|
# only close the sender side which will relay
|
||||||
|
@ -740,6 +742,7 @@ async def translate_aio_errors(
|
||||||
aio_done_before_trio: bool = aio_task.done()
|
aio_done_before_trio: bool = aio_task.done()
|
||||||
assert aio_task
|
assert aio_task
|
||||||
trio_err: BaseException|None = None
|
trio_err: BaseException|None = None
|
||||||
|
to_raise_trio: BaseException|None = None
|
||||||
try:
|
try:
|
||||||
yield # back to one of the cross-loop apis
|
yield # back to one of the cross-loop apis
|
||||||
except trio.Cancelled as taskc:
|
except trio.Cancelled as taskc:
|
||||||
|
@ -775,9 +778,8 @@ async def translate_aio_errors(
|
||||||
# called from `LinkedTaskChannel.receive()` which we want
|
# called from `LinkedTaskChannel.receive()` which we want
|
||||||
# passthrough and further we have no special meaning for it in
|
# passthrough and further we have no special meaning for it in
|
||||||
# terms of relaying errors or signals from the aio side!
|
# terms of relaying errors or signals from the aio side!
|
||||||
except trio.EndOfChannel as eoc:
|
except trio.EndOfChannel:
|
||||||
trio_err = chan._trio_err = eoc
|
raise
|
||||||
raise eoc
|
|
||||||
|
|
||||||
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
|
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
|
||||||
# task-done-callback.
|
# task-done-callback.
|
||||||
|
@ -823,7 +825,7 @@ async def translate_aio_errors(
|
||||||
raise cre
|
raise cre
|
||||||
|
|
||||||
except BaseException as _trio_err:
|
except BaseException as _trio_err:
|
||||||
trio_err = chan._trio_err = _trio_err
|
trio_err = chan._trio_err = trio_err
|
||||||
# await tractor.pause(shield=True) # workx!
|
# await tractor.pause(shield=True) # workx!
|
||||||
entered: bool = await _debug._maybe_enter_pm(
|
entered: bool = await _debug._maybe_enter_pm(
|
||||||
trio_err,
|
trio_err,
|
||||||
|
@ -864,27 +866,25 @@ async def translate_aio_errors(
|
||||||
f'The `trio`-side task crashed!\n'
|
f'The `trio`-side task crashed!\n'
|
||||||
f'{trio_err}'
|
f'{trio_err}'
|
||||||
)
|
)
|
||||||
# ??TODO? move this into the func that tries to use
|
aio_task.set_exception(aio_taskc)
|
||||||
# `Task._fut_waiter: Future` instead??
|
wait_on_aio_task = False
|
||||||
#
|
# try:
|
||||||
# aio_task.set_exception(aio_taskc)
|
# aio_task.set_exception(aio_taskc)
|
||||||
# wait_on_aio_task = False
|
# except (
|
||||||
try:
|
# asyncio.InvalidStateError,
|
||||||
aio_task.set_exception(aio_taskc)
|
# RuntimeError,
|
||||||
except (
|
# # ^XXX, uhh bc apparently we can't use `.set_exception()`
|
||||||
asyncio.InvalidStateError,
|
# # any more XD .. ??
|
||||||
RuntimeError,
|
# ):
|
||||||
# ^XXX, uhh bc apparently we can't use `.set_exception()`
|
# wait_on_aio_task = False
|
||||||
# any more XD .. ??
|
|
||||||
):
|
|
||||||
wait_on_aio_task = False
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# record wtv `trio`-side error transpired
|
# record wtv `trio`-side error transpired
|
||||||
if trio_err:
|
if trio_err:
|
||||||
assert chan._trio_err is trio_err
|
if chan._trio_err is not trio_err:
|
||||||
# if chan._trio_err is not trio_err:
|
await tractor.pause(shield=True)
|
||||||
# await tractor.pause(shield=True)
|
|
||||||
|
# assert chan._trio_err is trio_err
|
||||||
|
|
||||||
ya_trio_exited: bool = chan._trio_exited
|
ya_trio_exited: bool = chan._trio_exited
|
||||||
graceful_trio_exit: bool = (
|
graceful_trio_exit: bool = (
|
||||||
|
@ -1032,83 +1032,104 @@ async def translate_aio_errors(
|
||||||
'asyncio-task is done and unblocked trio-side!\n'
|
'asyncio-task is done and unblocked trio-side!\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# NOTE, was a `maybe_raise_aio_side_err()` closure that
|
# TODO?
|
||||||
# i moved inline BP
|
# -[ ] make this a channel method, OR
|
||||||
'''
|
# -[ ] just put back inline below?
|
||||||
Raise any `trio`-side-caused cancellation or legit task
|
#
|
||||||
error normally propagated from the caller of either,
|
# await tractor.pause(shield=True)
|
||||||
- `open_channel_from()`
|
# TODO, go back to inlining this..
|
||||||
- `run_task()`
|
def maybe_raise_aio_side_err(
|
||||||
|
trio_err: Exception,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Raise any `trio`-side-caused cancellation or legit task
|
||||||
|
error normally propagated from the caller of either,
|
||||||
|
- `open_channel_from()`
|
||||||
|
- `run_task()`
|
||||||
|
|
||||||
'''
|
'''
|
||||||
aio_err: BaseException|None = chan._aio_err
|
aio_err: BaseException|None = chan._aio_err
|
||||||
trio_to_raise: (
|
trio_to_raise: (
|
||||||
AsyncioCancelled|
|
AsyncioCancelled|
|
||||||
AsyncioTaskExited|
|
AsyncioTaskExited|
|
||||||
None
|
None
|
||||||
) = chan._trio_to_raise
|
) = chan._trio_to_raise
|
||||||
|
|
||||||
if not suppress_graceful_exits:
|
if not suppress_graceful_exits:
|
||||||
raise trio_to_raise from (aio_err or trio_err)
|
raise trio_to_raise from (aio_err or trio_err)
|
||||||
|
|
||||||
if trio_to_raise:
|
if trio_to_raise:
|
||||||
match (
|
# import pdbp; pdbp.set_trace()
|
||||||
trio_to_raise,
|
match (
|
||||||
trio_err,
|
trio_to_raise,
|
||||||
):
|
trio_err,
|
||||||
case (
|
|
||||||
AsyncioTaskExited(),
|
|
||||||
trio.Cancelled()|
|
|
||||||
None,
|
|
||||||
):
|
):
|
||||||
log.info(
|
case (
|
||||||
'Ignoring aio exit signal since trio also exited!'
|
AsyncioTaskExited(),
|
||||||
)
|
trio.Cancelled()|None,
|
||||||
return
|
):
|
||||||
|
|
||||||
case (
|
|
||||||
AsyncioTaskExited(),
|
|
||||||
trio.EndOfChannel(),
|
|
||||||
):
|
|
||||||
raise trio_err
|
|
||||||
|
|
||||||
case (
|
|
||||||
AsyncioCancelled(),
|
|
||||||
trio.Cancelled(),
|
|
||||||
):
|
|
||||||
if not aio_done_before_trio:
|
|
||||||
log.info(
|
log.info(
|
||||||
'Ignoring aio cancelled signal since trio was also cancelled!'
|
'Ignoring aio exit signal since trio also exited!'
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
case _:
|
|
||||||
raise trio_to_raise from (aio_err or trio_err)
|
|
||||||
|
|
||||||
# Check if the asyncio-side is the cause of the trio-side
|
case (
|
||||||
# error.
|
AsyncioCancelled(),
|
||||||
elif (
|
trio.Cancelled(),
|
||||||
aio_err is not None
|
):
|
||||||
and
|
if not aio_done_before_trio:
|
||||||
type(aio_err) is not AsyncioCancelled
|
log.info(
|
||||||
):
|
'Ignoring aio cancelled signal since trio was also cancelled!'
|
||||||
# always raise from any captured asyncio error
|
)
|
||||||
|
return
|
||||||
|
case _:
|
||||||
|
raise trio_to_raise from (aio_err or trio_err)
|
||||||
|
|
||||||
|
# Check if the asyncio-side is the cause of the trio-side
|
||||||
|
# error.
|
||||||
|
elif (
|
||||||
|
aio_err is not None
|
||||||
|
and
|
||||||
|
type(aio_err) is not AsyncioCancelled
|
||||||
|
# and (
|
||||||
|
# type(aio_err) is not AsyncioTaskExited
|
||||||
|
# and
|
||||||
|
# not ya_trio_exited
|
||||||
|
# and
|
||||||
|
# not trio_err
|
||||||
|
# )
|
||||||
|
|
||||||
|
# TODO, case where trio_err is not None and
|
||||||
|
# aio_err is AsyncioTaskExited => raise eg!
|
||||||
|
# -[ ] maybe use a match bc this get's real
|
||||||
|
# complex fast XD
|
||||||
|
#
|
||||||
|
# or
|
||||||
|
# type(aio_err) is not AsyncioTaskExited
|
||||||
|
# and
|
||||||
|
# trio_err
|
||||||
|
# )
|
||||||
|
):
|
||||||
|
# always raise from any captured asyncio error
|
||||||
|
if trio_err:
|
||||||
|
raise trio_err from aio_err
|
||||||
|
|
||||||
|
# XXX NOTE! above in the `trio.ClosedResourceError`
|
||||||
|
# handler we specifically set the
|
||||||
|
# `aio_err = AsyncioCancelled` such that it is raised
|
||||||
|
# as that special exc here!
|
||||||
|
raise aio_err
|
||||||
|
|
||||||
if trio_err:
|
if trio_err:
|
||||||
raise trio_err from aio_err
|
raise trio_err
|
||||||
|
|
||||||
# XXX NOTE! above in the `trio.ClosedResourceError`
|
# await tractor.pause()
|
||||||
# handler we specifically set the
|
# NOTE: if any ``asyncio`` error was caught, raise it here inline
|
||||||
# `aio_err = AsyncioCancelled` such that it is raised
|
# here in the ``trio`` task
|
||||||
# as that special exc here!
|
# if trio_err:
|
||||||
raise aio_err
|
maybe_raise_aio_side_err(
|
||||||
|
trio_err=to_raise_trio or trio_err
|
||||||
if trio_err:
|
)
|
||||||
raise trio_err
|
|
||||||
|
|
||||||
# ^^TODO?? case where trio_err is not None and
|
|
||||||
# aio_err is AsyncioTaskExited => raise eg!
|
|
||||||
# -[x] maybe use a match bc this get's real
|
|
||||||
# complex fast XD
|
|
||||||
# => i did this above for silent exit cases ya?
|
|
||||||
|
|
||||||
|
|
||||||
async def run_task(
|
async def run_task(
|
||||||
|
|
Loading…
Reference in New Issue