Compare commits

..

1 Commits

Author SHA1 Message Date
Tyler Goodlet 4c82b6e94f TOAMMEND: Add exit per-side task exit and cancellation signals 2025-03-01 21:25:05 -05:00
4 changed files with 81 additions and 179 deletions

View File

@ -32,16 +32,6 @@ from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc
@pytest.fixture(
scope='module',
)
def delay(debug_mode: bool) -> int:
if debug_mode:
return 999
else:
return 1
async def sleep_and_err(
sleep_for: float = 0.1,
@ -69,23 +59,19 @@ async def trio_cancels_single_aio_task():
await tractor.to_asyncio.run_task(aio_sleep_forever)
def test_trio_cancels_aio_on_actor_side(
reg_addr: tuple[str, int],
delay: int,
debug_mode: bool,
):
def test_trio_cancels_aio_on_actor_side(reg_addr):
'''
Spawn an infected actor that is cancelled by the ``trio`` side
task using std cancel scope apis.
'''
async def main():
with trio.fail_after(1 + delay):
# with trio.fail_after(1):
with trio.fail_after(999):
async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as an:
await an.run_in_actor(
registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
trio_cancels_single_aio_task,
infect_asyncio=True,
)
@ -132,10 +118,7 @@ async def asyncio_actor(
raise
def test_aio_simple_error(
reg_addr: tuple[str, int],
debug_mode: bool,
):
def test_aio_simple_error(reg_addr):
'''
Verify a simple remote asyncio error propagates back through trio
to the parent actor.
@ -144,10 +127,9 @@ def test_aio_simple_error(
'''
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as an:
await an.run_in_actor(
registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
asyncio_actor,
target='sleep_and_err',
expect_err='AssertionError',
@ -173,19 +155,14 @@ def test_aio_simple_error(
assert err.boxed_type is AssertionError
def test_tractor_cancels_aio(
reg_addr: tuple[str, int],
debug_mode: bool,
):
def test_tractor_cancels_aio(reg_addr):
'''
Verify we can cancel a spawned asyncio task gracefully.
'''
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.run_in_actor(
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
asyncio_actor,
target='aio_sleep_forever',
expect_err='trio.Cancelled',
@ -197,9 +174,7 @@ def test_tractor_cancels_aio(
trio.run(main)
def test_trio_cancels_aio(
reg_addr: tuple[str, int],
):
def test_trio_cancels_aio(reg_addr):
'''
Much like the above test with ``tractor.Portal.cancel_actor()``
except we just use a standard ``trio`` cancellation api.
@ -230,8 +205,7 @@ async def trio_ctx(
# this will block until the ``asyncio`` task sends a "first"
# message.
delay: int = 999 if tractor.debug_mode() else 1
with trio.fail_after(1 + delay):
with trio.fail_after(2):
try:
async with (
trio.open_nursery(
@ -267,10 +241,8 @@ async def trio_ctx(
ids='parent_actor_cancels_child={}'.format
)
def test_context_spawns_aio_task_that_errors(
reg_addr: tuple[str, int],
delay: int,
reg_addr,
parent_cancels: bool,
debug_mode: bool,
):
'''
Verify that spawning a task via an intertask channel ctx mngr that
@ -279,13 +251,14 @@ def test_context_spawns_aio_task_that_errors(
'''
async def main():
with trio.fail_after(1 + delay):
async with tractor.open_nursery() as an:
p = await an.start_actor(
with trio.fail_after(2):
# with trio.fail_after(999):
async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_daemon',
enable_modules=[__name__],
infect_asyncio=True,
debug_mode=debug_mode,
# debug_mode=True,
loglevel='cancel',
)
async with (
@ -352,12 +325,11 @@ async def aio_cancel():
def test_aio_cancelled_from_aio_causes_trio_cancelled(
reg_addr: tuple,
delay: int,
):
'''
When the `asyncio.Task` cancels itself the `trio` side should
When the `asyncio.Task` cancels itself the `trio` side cshould
also cancel and teardown and relay the cancellation cross-process
to the parent caller.
to the caller (parent).
'''
async def main():
@ -373,7 +345,8 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
# NOTE: normally the `an.__aexit__()` waits on the
# portal's result but we do it explicitly here
# to avoid indent levels.
with trio.fail_after(1 + delay):
# with trio.fail_after(1):
with trio.fail_after(999):
await p.wait_for_result()
with pytest.raises(
@ -424,7 +397,6 @@ async def push_from_aio_task(
i == 50
):
if fail_early:
print('Raising exc from aio side!')
raise Exception
if exit_early:
@ -453,7 +425,7 @@ async def push_from_aio_task(
async def stream_from_aio(
trio_exit_early: bool = False,
trio_raise_err: bool = False,
raise_err: bool = False,
aio_raise_err: bool = False,
aio_exit_early: bool = False,
fan_out: bool = False,
@ -468,18 +440,10 @@ async def stream_from_aio(
async with to_asyncio.open_channel_from(
push_from_aio_task,
sequence=seq,
expect_cancel=trio_raise_err or trio_exit_early,
expect_cancel=raise_err or trio_exit_early,
fail_early=aio_raise_err,
exit_early=aio_exit_early,
# such that we can test exit early cases
# for each side explicitly.
suppress_graceful_exits=(not(
aio_exit_early
or
trio_exit_early
))
) as (first, chan):
assert first is True
@ -495,7 +459,7 @@ async def stream_from_aio(
pulled.append(value)
if value == 50:
if trio_raise_err:
if raise_err:
raise Exception
elif trio_exit_early:
print('`consume()` breaking early!\n')
@ -514,11 +478,11 @@ async def stream_from_aio(
# tasks are joined..
chan.subscribe() as br,
trio.open_nursery() as tn,
trio.open_nursery() as n,
):
# start 2nd task that get's broadcast the same
# value set.
tn.start_soon(consume, br)
n.start_soon(consume, br)
await consume(chan)
else:
@ -531,14 +495,14 @@ async def stream_from_aio(
finally:
if not (
trio_raise_err
or
trio_exit_early
or
aio_raise_err
or
aio_exit_early
if (
not raise_err
and
not trio_exit_early
and
not aio_raise_err
and
not aio_exit_early
):
if fan_out:
# we get double the pulled values in the
@ -562,13 +526,10 @@ async def stream_from_aio(
'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format
)
def test_basic_interloop_channel_stream(
reg_addr: tuple[str, int],
fan_out: bool,
):
def test_basic_interloop_channel_stream(reg_addr, fan_out):
async def main():
async with tractor.open_nursery() as an:
portal = await an.run_in_actor(
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
stream_from_aio,
infect_asyncio=True,
fan_out=fan_out,
@ -582,10 +543,10 @@ def test_basic_interloop_channel_stream(
# TODO: parametrize the above test and avoid the duplication here?
def test_trio_error_cancels_intertask_chan(reg_addr):
async def main():
async with tractor.open_nursery() as an:
portal = await an.run_in_actor(
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
stream_from_aio,
trio_raise_err=True,
raise_err=True,
infect_asyncio=True,
)
# should trigger remote actor error
@ -600,8 +561,6 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
def test_trio_closes_early_causes_aio_checkpoint_raise(
reg_addr: tuple[str, int],
delay: int,
debug_mode: bool,
):
'''
Check that if the `trio`-task "exits early and silently" (in this
@ -613,12 +572,13 @@ def test_trio_closes_early_causes_aio_checkpoint_raise(
'''
async def main():
with trio.fail_after(1 + delay):
with trio.fail_after(2):
# with trio.fail_after(999):
async with tractor.open_nursery(
debug_mode=debug_mode,
# debug_mode=True,
# enable_stack_on_sig=True,
) as an:
portal = await an.run_in_actor(
) as n:
portal = await n.run_in_actor(
stream_from_aio,
trio_exit_early=True,
infect_asyncio=True,
@ -646,8 +606,6 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
# - trio cancelled AND aio exits early on its next tick
# - trio errors AND aio exits early on its next tick
reg_addr: tuple[str, int],
debug_mode: bool,
delay: int,
):
'''
Check that if the `asyncio`-task "exits early and silently" (in this
@ -663,9 +621,10 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
'''
async def main():
with trio.fail_after(1 + delay):
with trio.fail_after(2):
# with trio.fail_after(999):
async with tractor.open_nursery(
debug_mode=debug_mode,
# debug_mode=True,
# enable_stack_on_sig=True,
) as an:
portal = await an.run_in_actor(
@ -699,15 +658,10 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
def test_aio_errors_and_channel_propagates_and_closes(
reg_addr: tuple[str, int],
debug_mode: bool,
):
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.run_in_actor(
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
stream_from_aio,
aio_raise_err=True,
infect_asyncio=True,
@ -782,15 +736,13 @@ async def trio_to_aio_echo_server(
ids='raise_error={}'.format,
)
def test_echoserver_detailed_mechanics(
reg_addr: tuple[str, int],
debug_mode: bool,
reg_addr,
raise_error_mid_stream,
):
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
p = await an.start_actor(
async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_server',
enable_modules=[__name__],
infect_asyncio=True,
@ -995,8 +947,6 @@ def test_sigint_closes_lifetime_stack(
'''
async def main():
delay = 999 if tractor.debug_mode() else 1
try:
an: tractor.ActorNursery
async with tractor.open_nursery(
@ -1047,7 +997,7 @@ def test_sigint_closes_lifetime_stack(
if wait_for_ctx:
print('waiting for ctx outcome in parent..')
try:
with trio.fail_after(1 + delay):
with trio.fail_after(1):
await ctx.wait_for_result()
except tractor.ContextCancelled as ctxc:
assert ctxc.canceller == ctx.chan.uid

View File

@ -43,7 +43,6 @@ from ._state import (
current_actor as current_actor,
is_root_process as is_root_process,
current_ipc_ctx as current_ipc_ctx,
debug_mode as debug_mode
)
from ._exceptions import (
ContextCancelled as ContextCancelled,
@ -66,4 +65,3 @@ from ._root import (
from ._ipc import Channel as Channel
from ._portal import Portal as Portal
from ._runtime import Actor as Actor
from . import hilevel as hilevel

View File

@ -103,16 +103,7 @@ class AsyncioTaskExited(Exception):
'''
class TrioCancelled(Exception):
'''
Trio cancelled translation (non-base) error
for use with the `to_asyncio` module
to be raised in the `asyncio.Task` to indicate
that the `trio` side raised `Cancelled` or an error.
'''
class TrioTaskExited(Exception):
class TrioTaskExited(AsyncioCancelled):
'''
The `trio`-side task exited without explicitly cancelling the
`asyncio.Task` peer.
@ -415,9 +406,6 @@ class RemoteActorError(Exception):
String-name of the (last hop's) boxed error type.
'''
# TODO, maybe support also serializing the
# `ExceptionGroup.exeptions: list[BaseException]` set under
# certain conditions?
bt: Type[BaseException] = self.boxed_type
if bt:
return str(bt.__name__)
@ -833,11 +821,8 @@ class MsgTypeError(
'''
if (
(_bad_msg := self.msgdata.get('_bad_msg'))
and (
and
isinstance(_bad_msg, PayloadMsg)
or
isinstance(_bad_msg, msgtypes.Start)
)
):
return _bad_msg

View File

@ -116,8 +116,6 @@ class LinkedTaskChannel(
_trio_task: trio.Task
_aio_task_complete: trio.Event
_suppress_graceful_exits: bool = True
_trio_err: BaseException|None = None
_trio_to_raise: (
AsyncioTaskExited| # aio task exits while trio ongoing
@ -231,8 +229,6 @@ class LinkedTaskChannel(
except BaseException as err:
async with translate_aio_errors(
chan=self,
# NOTE, determined by `open_channel_from()` input arg
suppress_graceful_exits=self._suppress_graceful_exits,
# XXX: obviously this will deadlock if an on-going stream is
# being procesed.
@ -304,7 +300,6 @@ def _run_asyncio_task(
*,
qsize: int = 1,
provide_channels: bool = False,
suppress_graceful_exits: bool = True,
hide_tb: bool = False,
**kwargs,
@ -357,7 +352,6 @@ def _run_asyncio_task(
_trio_cs=trio_cs,
_trio_task=trio_task,
_aio_task_complete=aio_task_complete,
_suppress_graceful_exits=suppress_graceful_exits,
)
async def wait_on_coro_final_result(
@ -371,6 +365,7 @@ def _run_asyncio_task(
`return`-ed result back to `trio`.
'''
nonlocal aio_err
nonlocal chan
orig = result = id(coro)
@ -388,6 +383,7 @@ def _run_asyncio_task(
'`asyncio` task errored\n'
)
raise
else:
if (
result != orig
@ -423,11 +419,7 @@ def _run_asyncio_task(
# - raise error and aio side errors "independently"
# on next tick (SEE draft HANDLER BELOW).
stats: trio.MemoryChannelStatistics = to_trio.statistics()
if (
stats.tasks_waiting_receive
and
not chan._aio_err
):
if stats.tasks_waiting_receive:
chan._trio_to_raise = AsyncioTaskExited(
f'Task existed with final result\n'
f'{result!r}\n'
@ -521,9 +513,6 @@ def _run_asyncio_task(
f')> {res}\n'
f' |_{task}\n'
)
if not chan._aio_result:
chan._aio_result = res
# ?TODO, should we also raise `AsyncioTaskExited[res]`
# in any case where trio is NOT blocking on the
# `._to_trio` chan?
@ -604,7 +593,6 @@ def _run_asyncio_task(
# is trio the src of the aio task's exc-as-outcome?
trio_err: BaseException|None = chan._trio_err
curr_aio_err: BaseException|None = chan._aio_err
if (
curr_aio_err
or
@ -660,8 +648,6 @@ def _run_asyncio_task(
trio_to_raise: AsyncioCancelled|AsyncioTaskExited = chan._trio_to_raise
aio_to_raise: TrioTaskExited|TrioCancelled = chan._aio_to_raise
if (
not chan._aio_result
and
not trio_cs.cancelled_caught
and (
(aio_err and type(aio_err) not in {
@ -694,17 +680,7 @@ def _run_asyncio_task(
# match the one we just caught from the task above!
# (that would indicate something weird/very-wrong
# going on?)
if (
aio_err is not trio_to_raise
and (
not suppress_graceful_exits
and (
chan._aio_result is not Unresolved
and
isinstance(trio_to_raise, AsyncioTaskExited)
)
)
):
if aio_err is not trio_to_raise:
# raise aio_err from relayed_aio_err
raise trio_to_raise from curr_aio_err
@ -739,7 +715,6 @@ async def translate_aio_errors(
trio_task = trio.lowlevel.current_task()
aio_err: BaseException|None = chan._aio_err
aio_task: asyncio.Task = chan._aio_task
aio_done_before_trio: bool = aio_task.done()
assert aio_task
trio_err: BaseException|None = None
to_raise_trio: BaseException|None = None
@ -1036,8 +1011,6 @@ async def translate_aio_errors(
# -[ ] make this a channel method, OR
# -[ ] just put back inline below?
#
# await tractor.pause(shield=True)
# TODO, go back to inlining this..
def maybe_raise_aio_side_err(
trio_err: Exception,
) -> None:
@ -1055,10 +1028,11 @@ async def translate_aio_errors(
None
) = chan._trio_to_raise
if not suppress_graceful_exits:
raise trio_to_raise from (aio_err or trio_err)
if trio_to_raise:
if (
trio_to_raise
and
suppress_graceful_exits
):
# import pdbp; pdbp.set_trace()
match (
trio_to_raise,
@ -1066,7 +1040,7 @@ async def translate_aio_errors(
):
case (
AsyncioTaskExited(),
trio.Cancelled()|None,
None|trio.Cancelled(),
):
log.info(
'Ignoring aio exit signal since trio also exited!'
@ -1077,12 +1051,11 @@ async def translate_aio_errors(
AsyncioCancelled(),
trio.Cancelled(),
):
if not aio_done_before_trio:
log.info(
'Ignoring aio cancelled signal since trio was also cancelled!'
)
return
case _:
raise trio_to_raise from (aio_err or trio_err)
# Check if the asyncio-side is the cause of the trio-side
@ -1155,7 +1128,6 @@ async def run_task(
async with translate_aio_errors(
chan,
wait_on_aio_task=True,
suppress_graceful_exits=chan._suppress_graceful_exits,
):
# return single value that is the output from the
# ``asyncio`` function-as-task. Expect the mem chan api
@ -1171,8 +1143,7 @@ async def run_task(
async def open_channel_from(
target: Callable[..., Any],
suppress_graceful_exits: bool = True,
**target_kwargs,
**kwargs,
) -> AsyncIterator[Any]:
'''
@ -1184,15 +1155,13 @@ async def open_channel_from(
target,
qsize=2**8,
provide_channels=True,
suppress_graceful_exits=suppress_graceful_exits,
**target_kwargs,
**kwargs,
)
# TODO, tuple form here?
async with chan._from_aio:
async with translate_aio_errors(
chan,
wait_on_aio_task=True,
suppress_graceful_exits=suppress_graceful_exits,
):
# sync to a "started()"-like first delivered value from the
# ``asyncio`` task.