Compare commits

..

4 Commits

Author SHA1 Message Date
Tyler Goodlet 7fac170f8d Add equiv of `AsyncioCancelled` for aio side
Such that a `TrioCancelled` is raised in the aio task via
`.set_exception()` to explicitly indicate and allow that task to handle
a taskc request from the parent `trio.Task`.
2025-03-03 12:13:25 -05:00
Tyler Goodlet 44281268a8 More `debug_mode` test support, better nursery var names 2025-03-03 12:11:50 -05:00
Tyler Goodlet 064c5ce034 Add per-side graceful-exit/cancel excs-as-signals
Such that any combination of task terminations/exits can be explicitly
handled and "dual side independent" crash cases re-raised in egs.

The main error-or-exit impl changes include,

- use of new per-side "signaling exceptions":
  - TrioTaskExited|TrioCancelled for signalling aio.
  - AsyncioTaskExited|AsyncioCancelled for signalling trio.

- NOT overloading the `LinkedTaskChannel._trio/aio_err` fields for
  err-as-signal relay and instead add a new pair of
  `._trio/aio_to_raise` maybe-exc-attrs which allow each side's
  task to specify what it would want the other side to raise to signal
  its/a termination outcome:
  - `._trio_to_raise: AsyncioTaskExited|AsyncioCancelled` to signal,
    |_ the aio task having returned while the trio side was still reading
       from the `asyncio.Queue` or is just not `.done()`.
    |_ the aio task being self or trio-request cancelled where
       a `asyncio.CancelledError` is raised and caught but NOT relayed
       as is back to trio; instead signal a "more explicit" exc type.
  - `._aio_to_raise: TrioTaskExited|TrioCancelled` to signal,
    |_ the trio task having returned while the aio side was still reading
       from the mem chan and indicating that the trio side might not
       care any more about future streamed values (like the
       `Stop/EndOfChannel` equivs for ipc `Context`s).
    |_ when the trio task canceld we do
        a `asyncio.Future.set_exception(TrioTaskExited())` to indicate
        to the aio side verbosely that it should cancel due to the trio
        parent.
  - `_aio/trio_err` are now left to only capturing the **actual**
    per-side task excs for introspection / other side's handling logic.

- supporting "graceful exits" depending on API in use from
  `translate_aio_errors()` such that if either side exits but the other
  side isn't expect to consume the final `return`ed value, we just exit
  silently, which required:
  - adding a `suppress_graceful_exits: bool` flag.
  - adjusting the `maybe_raise_aio_side_err()` logic to use that flag
    and suppress only on certain combos of `._trio_to_raise/._trio_err`.
  - prefer to raise `._trio_to_raise` when the aio-side is the src and
    vice versa.

- filling out pedantic logging for cancellation cases indicating which
  side is the cause.

- add a `LinkedTaskChannel._aio_result` modelled after our
  `Context._result` a a similar `.wait_for_result()` interface which
  allows maybe accessing the aio task's final return value if desired
  when using the `open_channel_from()` API.

- rename `cancel_trio()` done handler -> `signal_trio_when_done()`

Also some fairly major test suite updates,
- add a `delay: int` producing fixture which delivers a much larger
  timeout whenever `debug_mode` is set so that the REPL can be used
  without a surrounding cancel firing.
- add a new `test_aio_exits_early_relays_AsyncioTaskExited` including
  a paired `exit_early: bool` flag to `push_from_aio_task()`.
- adjust `test_trio_closes_early_causes_aio_checkpoint_raise` to expect
  a `to_asyncio.TrioTaskExited`.
2025-03-03 12:00:14 -05:00
Tyler Goodlet 6637473b54 Expose `._state.debug_mode()` predicate at top level 2025-03-03 11:16:47 -05:00
4 changed files with 179 additions and 81 deletions

View File

@ -32,6 +32,16 @@ from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc
@pytest.fixture(
scope='module',
)
def delay(debug_mode: bool) -> int:
if debug_mode:
return 999
else:
return 1
async def sleep_and_err(
sleep_for: float = 0.1,
@ -59,19 +69,23 @@ async def trio_cancels_single_aio_task():
await tractor.to_asyncio.run_task(aio_sleep_forever)
def test_trio_cancels_aio_on_actor_side(reg_addr):
def test_trio_cancels_aio_on_actor_side(
reg_addr: tuple[str, int],
delay: int,
debug_mode: bool,
):
'''
Spawn an infected actor that is cancelled by the ``trio`` side
task using std cancel scope apis.
'''
async def main():
# with trio.fail_after(1):
with trio.fail_after(999):
with trio.fail_after(1 + delay):
async with tractor.open_nursery(
registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as an:
await an.run_in_actor(
trio_cancels_single_aio_task,
infect_asyncio=True,
)
@ -118,7 +132,10 @@ async def asyncio_actor(
raise
def test_aio_simple_error(reg_addr):
def test_aio_simple_error(
reg_addr: tuple[str, int],
debug_mode: bool,
):
'''
Verify a simple remote asyncio error propagates back through trio
to the parent actor.
@ -127,9 +144,10 @@ def test_aio_simple_error(reg_addr):
'''
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as an:
await an.run_in_actor(
asyncio_actor,
target='sleep_and_err',
expect_err='AssertionError',
@ -155,14 +173,19 @@ def test_aio_simple_error(reg_addr):
assert err.boxed_type is AssertionError
def test_tractor_cancels_aio(reg_addr):
def test_tractor_cancels_aio(
reg_addr: tuple[str, int],
debug_mode: bool,
):
'''
Verify we can cancel a spawned asyncio task gracefully.
'''
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.run_in_actor(
asyncio_actor,
target='aio_sleep_forever',
expect_err='trio.Cancelled',
@ -174,7 +197,9 @@ def test_tractor_cancels_aio(reg_addr):
trio.run(main)
def test_trio_cancels_aio(reg_addr):
def test_trio_cancels_aio(
reg_addr: tuple[str, int],
):
'''
Much like the above test with ``tractor.Portal.cancel_actor()``
except we just use a standard ``trio`` cancellation api.
@ -205,7 +230,8 @@ async def trio_ctx(
# this will block until the ``asyncio`` task sends a "first"
# message.
with trio.fail_after(2):
delay: int = 999 if tractor.debug_mode() else 1
with trio.fail_after(1 + delay):
try:
async with (
trio.open_nursery(
@ -241,8 +267,10 @@ async def trio_ctx(
ids='parent_actor_cancels_child={}'.format
)
def test_context_spawns_aio_task_that_errors(
reg_addr,
reg_addr: tuple[str, int],
delay: int,
parent_cancels: bool,
debug_mode: bool,
):
'''
Verify that spawning a task via an intertask channel ctx mngr that
@ -251,14 +279,13 @@ def test_context_spawns_aio_task_that_errors(
'''
async def main():
with trio.fail_after(2):
# with trio.fail_after(999):
async with tractor.open_nursery() as n:
p = await n.start_actor(
with trio.fail_after(1 + delay):
async with tractor.open_nursery() as an:
p = await an.start_actor(
'aio_daemon',
enable_modules=[__name__],
infect_asyncio=True,
# debug_mode=True,
debug_mode=debug_mode,
loglevel='cancel',
)
async with (
@ -325,11 +352,12 @@ async def aio_cancel():
def test_aio_cancelled_from_aio_causes_trio_cancelled(
reg_addr: tuple,
delay: int,
):
'''
When the `asyncio.Task` cancels itself the `trio` side cshould
When the `asyncio.Task` cancels itself the `trio` side should
also cancel and teardown and relay the cancellation cross-process
to the caller (parent).
to the parent caller.
'''
async def main():
@ -345,8 +373,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
# NOTE: normally the `an.__aexit__()` waits on the
# portal's result but we do it explicitly here
# to avoid indent levels.
# with trio.fail_after(1):
with trio.fail_after(999):
with trio.fail_after(1 + delay):
await p.wait_for_result()
with pytest.raises(
@ -397,6 +424,7 @@ async def push_from_aio_task(
i == 50
):
if fail_early:
print('Raising exc from aio side!')
raise Exception
if exit_early:
@ -425,7 +453,7 @@ async def push_from_aio_task(
async def stream_from_aio(
trio_exit_early: bool = False,
raise_err: bool = False,
trio_raise_err: bool = False,
aio_raise_err: bool = False,
aio_exit_early: bool = False,
fan_out: bool = False,
@ -440,10 +468,18 @@ async def stream_from_aio(
async with to_asyncio.open_channel_from(
push_from_aio_task,
sequence=seq,
expect_cancel=raise_err or trio_exit_early,
expect_cancel=trio_raise_err or trio_exit_early,
fail_early=aio_raise_err,
exit_early=aio_exit_early,
# such that we can test exit early cases
# for each side explicitly.
suppress_graceful_exits=(not(
aio_exit_early
or
trio_exit_early
))
) as (first, chan):
assert first is True
@ -459,7 +495,7 @@ async def stream_from_aio(
pulled.append(value)
if value == 50:
if raise_err:
if trio_raise_err:
raise Exception
elif trio_exit_early:
print('`consume()` breaking early!\n')
@ -478,11 +514,11 @@ async def stream_from_aio(
# tasks are joined..
chan.subscribe() as br,
trio.open_nursery() as n,
trio.open_nursery() as tn,
):
# start 2nd task that get's broadcast the same
# value set.
n.start_soon(consume, br)
tn.start_soon(consume, br)
await consume(chan)
else:
@ -495,14 +531,14 @@ async def stream_from_aio(
finally:
if (
not raise_err
and
not trio_exit_early
and
not aio_raise_err
and
not aio_exit_early
if not (
trio_raise_err
or
trio_exit_early
or
aio_raise_err
or
aio_exit_early
):
if fan_out:
# we get double the pulled values in the
@ -526,10 +562,13 @@ async def stream_from_aio(
'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format
)
def test_basic_interloop_channel_stream(reg_addr, fan_out):
def test_basic_interloop_channel_stream(
reg_addr: tuple[str, int],
fan_out: bool,
):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
async with tractor.open_nursery() as an:
portal = await an.run_in_actor(
stream_from_aio,
infect_asyncio=True,
fan_out=fan_out,
@ -543,10 +582,10 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
# TODO: parametrize the above test and avoid the duplication here?
def test_trio_error_cancels_intertask_chan(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
async with tractor.open_nursery() as an:
portal = await an.run_in_actor(
stream_from_aio,
raise_err=True,
trio_raise_err=True,
infect_asyncio=True,
)
# should trigger remote actor error
@ -561,6 +600,8 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
def test_trio_closes_early_causes_aio_checkpoint_raise(
reg_addr: tuple[str, int],
delay: int,
debug_mode: bool,
):
'''
Check that if the `trio`-task "exits early and silently" (in this
@ -572,13 +613,12 @@ def test_trio_closes_early_causes_aio_checkpoint_raise(
'''
async def main():
with trio.fail_after(2):
# with trio.fail_after(999):
with trio.fail_after(1 + delay):
async with tractor.open_nursery(
# debug_mode=True,
debug_mode=debug_mode,
# enable_stack_on_sig=True,
) as n:
portal = await n.run_in_actor(
) as an:
portal = await an.run_in_actor(
stream_from_aio,
trio_exit_early=True,
infect_asyncio=True,
@ -606,6 +646,8 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
# - trio cancelled AND aio exits early on its next tick
# - trio errors AND aio exits early on its next tick
reg_addr: tuple[str, int],
debug_mode: bool,
delay: int,
):
'''
Check that if the `asyncio`-task "exits early and silently" (in this
@ -621,10 +663,9 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
'''
async def main():
with trio.fail_after(2):
# with trio.fail_after(999):
with trio.fail_after(1 + delay):
async with tractor.open_nursery(
# debug_mode=True,
debug_mode=debug_mode,
# enable_stack_on_sig=True,
) as an:
portal = await an.run_in_actor(
@ -658,10 +699,15 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
def test_aio_errors_and_channel_propagates_and_closes(
reg_addr: tuple[str, int],
debug_mode: bool,
):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.run_in_actor(
stream_from_aio,
aio_raise_err=True,
infect_asyncio=True,
@ -736,13 +782,15 @@ async def trio_to_aio_echo_server(
ids='raise_error={}'.format,
)
def test_echoserver_detailed_mechanics(
reg_addr,
reg_addr: tuple[str, int],
debug_mode: bool,
raise_error_mid_stream,
):
async def main():
async with tractor.open_nursery() as n:
p = await n.start_actor(
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
p = await an.start_actor(
'aio_server',
enable_modules=[__name__],
infect_asyncio=True,
@ -947,6 +995,8 @@ def test_sigint_closes_lifetime_stack(
'''
async def main():
delay = 999 if tractor.debug_mode() else 1
try:
an: tractor.ActorNursery
async with tractor.open_nursery(
@ -997,7 +1047,7 @@ def test_sigint_closes_lifetime_stack(
if wait_for_ctx:
print('waiting for ctx outcome in parent..')
try:
with trio.fail_after(1):
with trio.fail_after(1 + delay):
await ctx.wait_for_result()
except tractor.ContextCancelled as ctxc:
assert ctxc.canceller == ctx.chan.uid

View File

@ -43,6 +43,7 @@ from ._state import (
current_actor as current_actor,
is_root_process as is_root_process,
current_ipc_ctx as current_ipc_ctx,
debug_mode as debug_mode
)
from ._exceptions import (
ContextCancelled as ContextCancelled,
@ -65,3 +66,4 @@ from ._root import (
from ._ipc import Channel as Channel
from ._portal import Portal as Portal
from ._runtime import Actor as Actor
from . import hilevel as hilevel

View File

@ -103,7 +103,16 @@ class AsyncioTaskExited(Exception):
'''
class TrioTaskExited(AsyncioCancelled):
class TrioCancelled(Exception):
'''
Trio cancelled translation (non-base) error
for use with the `to_asyncio` module
to be raised in the `asyncio.Task` to indicate
that the `trio` side raised `Cancelled` or an error.
'''
class TrioTaskExited(Exception):
'''
The `trio`-side task exited without explicitly cancelling the
`asyncio.Task` peer.
@ -406,6 +415,9 @@ class RemoteActorError(Exception):
String-name of the (last hop's) boxed error type.
'''
# TODO, maybe support also serializing the
# `ExceptionGroup.exeptions: list[BaseException]` set under
# certain conditions?
bt: Type[BaseException] = self.boxed_type
if bt:
return str(bt.__name__)
@ -821,8 +833,11 @@ class MsgTypeError(
'''
if (
(_bad_msg := self.msgdata.get('_bad_msg'))
and
isinstance(_bad_msg, PayloadMsg)
and (
isinstance(_bad_msg, PayloadMsg)
or
isinstance(_bad_msg, msgtypes.Start)
)
):
return _bad_msg

View File

@ -116,6 +116,8 @@ class LinkedTaskChannel(
_trio_task: trio.Task
_aio_task_complete: trio.Event
_suppress_graceful_exits: bool = True
_trio_err: BaseException|None = None
_trio_to_raise: (
AsyncioTaskExited| # aio task exits while trio ongoing
@ -229,6 +231,8 @@ class LinkedTaskChannel(
except BaseException as err:
async with translate_aio_errors(
chan=self,
# NOTE, determined by `open_channel_from()` input arg
suppress_graceful_exits=self._suppress_graceful_exits,
# XXX: obviously this will deadlock if an on-going stream is
# being procesed.
@ -300,6 +304,7 @@ def _run_asyncio_task(
*,
qsize: int = 1,
provide_channels: bool = False,
suppress_graceful_exits: bool = True,
hide_tb: bool = False,
**kwargs,
@ -352,6 +357,7 @@ def _run_asyncio_task(
_trio_cs=trio_cs,
_trio_task=trio_task,
_aio_task_complete=aio_task_complete,
_suppress_graceful_exits=suppress_graceful_exits,
)
async def wait_on_coro_final_result(
@ -365,7 +371,6 @@ def _run_asyncio_task(
`return`-ed result back to `trio`.
'''
nonlocal aio_err
nonlocal chan
orig = result = id(coro)
@ -383,7 +388,6 @@ def _run_asyncio_task(
'`asyncio` task errored\n'
)
raise
else:
if (
result != orig
@ -419,7 +423,11 @@ def _run_asyncio_task(
# - raise error and aio side errors "independently"
# on next tick (SEE draft HANDLER BELOW).
stats: trio.MemoryChannelStatistics = to_trio.statistics()
if stats.tasks_waiting_receive:
if (
stats.tasks_waiting_receive
and
not chan._aio_err
):
chan._trio_to_raise = AsyncioTaskExited(
f'Task existed with final result\n'
f'{result!r}\n'
@ -513,6 +521,9 @@ def _run_asyncio_task(
f')> {res}\n'
f' |_{task}\n'
)
if not chan._aio_result:
chan._aio_result = res
# ?TODO, should we also raise `AsyncioTaskExited[res]`
# in any case where trio is NOT blocking on the
# `._to_trio` chan?
@ -593,6 +604,7 @@ def _run_asyncio_task(
# is trio the src of the aio task's exc-as-outcome?
trio_err: BaseException|None = chan._trio_err
curr_aio_err: BaseException|None = chan._aio_err
if (
curr_aio_err
or
@ -648,6 +660,8 @@ def _run_asyncio_task(
trio_to_raise: AsyncioCancelled|AsyncioTaskExited = chan._trio_to_raise
aio_to_raise: TrioTaskExited|TrioCancelled = chan._aio_to_raise
if (
not chan._aio_result
and
not trio_cs.cancelled_caught
and (
(aio_err and type(aio_err) not in {
@ -680,7 +694,17 @@ def _run_asyncio_task(
# match the one we just caught from the task above!
# (that would indicate something weird/very-wrong
# going on?)
if aio_err is not trio_to_raise:
if (
aio_err is not trio_to_raise
and (
not suppress_graceful_exits
and (
chan._aio_result is not Unresolved
and
isinstance(trio_to_raise, AsyncioTaskExited)
)
)
):
# raise aio_err from relayed_aio_err
raise trio_to_raise from curr_aio_err
@ -715,6 +739,7 @@ async def translate_aio_errors(
trio_task = trio.lowlevel.current_task()
aio_err: BaseException|None = chan._aio_err
aio_task: asyncio.Task = chan._aio_task
aio_done_before_trio: bool = aio_task.done()
assert aio_task
trio_err: BaseException|None = None
to_raise_trio: BaseException|None = None
@ -1011,6 +1036,8 @@ async def translate_aio_errors(
# -[ ] make this a channel method, OR
# -[ ] just put back inline below?
#
# await tractor.pause(shield=True)
# TODO, go back to inlining this..
def maybe_raise_aio_side_err(
trio_err: Exception,
) -> None:
@ -1028,11 +1055,10 @@ async def translate_aio_errors(
None
) = chan._trio_to_raise
if (
trio_to_raise
and
suppress_graceful_exits
):
if not suppress_graceful_exits:
raise trio_to_raise from (aio_err or trio_err)
if trio_to_raise:
# import pdbp; pdbp.set_trace()
match (
trio_to_raise,
@ -1040,7 +1066,7 @@ async def translate_aio_errors(
):
case (
AsyncioTaskExited(),
None|trio.Cancelled(),
trio.Cancelled()|None,
):
log.info(
'Ignoring aio exit signal since trio also exited!'
@ -1051,12 +1077,13 @@ async def translate_aio_errors(
AsyncioCancelled(),
trio.Cancelled(),
):
log.info(
'Ignoring aio cancelled signal since trio was also cancelled!'
)
return
raise trio_to_raise from (aio_err or trio_err)
if not aio_done_before_trio:
log.info(
'Ignoring aio cancelled signal since trio was also cancelled!'
)
return
case _:
raise trio_to_raise from (aio_err or trio_err)
# Check if the asyncio-side is the cause of the trio-side
# error.
@ -1128,6 +1155,7 @@ async def run_task(
async with translate_aio_errors(
chan,
wait_on_aio_task=True,
suppress_graceful_exits=chan._suppress_graceful_exits,
):
# return single value that is the output from the
# ``asyncio`` function-as-task. Expect the mem chan api
@ -1143,7 +1171,8 @@ async def run_task(
async def open_channel_from(
target: Callable[..., Any],
**kwargs,
suppress_graceful_exits: bool = True,
**target_kwargs,
) -> AsyncIterator[Any]:
'''
@ -1155,13 +1184,15 @@ async def open_channel_from(
target,
qsize=2**8,
provide_channels=True,
**kwargs,
suppress_graceful_exits=suppress_graceful_exits,
**target_kwargs,
)
# TODO, tuple form here?
async with chan._from_aio:
async with translate_aio_errors(
chan,
wait_on_aio_task=True,
suppress_graceful_exits=suppress_graceful_exits,
):
# sync to a "started()"-like first delivered value from the
# ``asyncio`` task.