Compare commits

..

6 Commits

Author SHA1 Message Date
Tyler Goodlet 54658016f1 Check off REPL-ing todo add masked usage in `drain_to_final_msg()` 2025-09-09 18:13:28 -04:00
Tyler Goodlet 1975d5bd86 Timeout on `test_peer_spawns_and_cancels_service_subactor`
While working on a fix to the hang case found from
`test_cancel_ctx_with_parent_side_entered_in_bg_task` an initial
solution caused this test to hang indefinitely; solve it with a small
wrapping `_main()` + `trio.fail_after()` entrypoint.

Further suite refinements,
- move the top-most `try:`->`else:` block
- toss in a masked base-exc block for tracing unexpected
  `ctx.wait_for_result()` outcomes.
- tweak the `raise_sub_spawn_error_after` to be an optional `float`
  which scales the `rng_seed: int = 50` msg counter to
  `tell_little_bro()` so that the abs value to the `range()` can be
  changed.
2025-09-09 17:33:20 -04:00
Tyler Goodlet b83d8bb91d Rename var for and hide the `_open_and_supervise_one_cancels_all_nursery` frame 2025-09-08 18:15:00 -04:00
Tyler Goodlet 81ba90492b Add timeout around `test_peer_spawns_and_cancels_service_subactor` suite 2025-09-08 17:58:02 -04:00
Tyler Goodlet 16bfc35c09 Parametrize with `Portal.cancel_actor()` only case
Such that when `maybe_context.cancel()` is not called (explicitly) and
only the subactor is cancelled by its parent we expect to see a ctxc
raised both from any call to `Context.wait_for_result()` and out of
the `Portal.open_context()` scope, up to the `trio.run()`.

Deats,
- obvi call-n-catch the ctxc (in scope) for the oob-only
  subactor-cancelled case.
- add branches around `trio.run()` entry to match.
2025-09-08 17:39:54 -04:00
Tyler Goodlet 3772b70e54 Add the minimal OoB cancel edge case from #391
Discovered while writing a `@context` sanity test to verify unmasker
ignore-cases support. Masked code is due to the process of finding the
minimal example causing the original hang discovered in the original
examples script. Details are in the test-fn doc strings and surrounding
comments; more refinement and cleanup coming obviously.

Also moved over the self-cancel todos from the inter-peer tests module.
2025-09-08 11:54:28 -04:00
4 changed files with 291 additions and 44 deletions

View File

@ -24,14 +24,10 @@ from tractor._testing import (
)
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - [x] WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
# - [x] INTER-PEER: some arbitrary remote peer cancels via
# Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
@ -44,16 +40,6 @@ from tractor._testing import (
# that also spawned a remote task task in that same peer-parent.
# def test_self_cancel():
# '''
# 2 cases:
# - calls `Actor.cancel()` locally in some task
# - calls LocalPortal.cancel_actor()` ?
# '''
# ...
@tractor.context
async def open_stream_then_sleep_forever(
ctx: Context,
@ -806,7 +792,7 @@ async def basic_echo_server(
ctx: Context,
peer_name: str = 'wittle_bruv',
err_after: int|None = None,
err_after_imsg: int|None = None,
) -> None:
'''
@ -835,8 +821,9 @@ async def basic_echo_server(
await ipc.send(resp)
if (
err_after
and i > err_after
err_after_imsg
and
i > err_after_imsg
):
raise RuntimeError(
f'Simulated error in `{peer_name}`'
@ -978,7 +965,8 @@ async def tell_little_bro(
actor_name: str,
caller: str = '',
err_after: int|None = None,
err_after: float|None = None,
rng_seed: int = 50,
):
# contact target actor, do a stream dialog.
async with (
@ -989,14 +977,18 @@ async def tell_little_bro(
basic_echo_server,
# XXX proxy any delayed err condition
err_after=err_after,
err_after_imsg=(
err_after * rng_seed
if err_after is not None
else None
),
) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc,
):
actor: Actor = current_actor()
uid: tuple = actor.uid
for i in range(100):
for i in range(rng_seed):
msg: tuple = (
uid,
i,
@ -1021,13 +1013,13 @@ async def tell_little_bro(
)
@pytest.mark.parametrize(
'raise_sub_spawn_error_after',
[None, 50],
[None, 0.5],
)
def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool,
raise_client_error: str,
reg_addr: tuple[str, int],
raise_sub_spawn_error_after: int|None,
raise_sub_spawn_error_after: float|None,
):
# NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx
@ -1041,6 +1033,7 @@ def test_peer_spawns_and_cancels_service_subactor(
# and the server's spawned child should cancel and terminate!
peer_name: str = 'little_bro'
def check_inner_rte(rae: RemoteActorError):
'''
Validate the little_bro's relayed inception!
@ -1134,8 +1127,7 @@ def test_peer_spawns_and_cancels_service_subactor(
)
try:
res = await client_ctx.result(hide_tb=False)
res = await client_ctx.wait_for_result(hide_tb=False)
# in remote (relayed inception) error
# case, we should error on the line above!
if raise_sub_spawn_error_after:
@ -1146,6 +1138,23 @@ def test_peer_spawns_and_cancels_service_subactor(
assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked
assert res.canceller == root.uid
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# XXX, only for tracing
# except BaseException as _berr:
# berr = _berr
# await tractor.pause(shield=True)
# raise berr
except RemoteActorError as rae:
_err = rae
@ -1174,19 +1183,8 @@ def test_peer_spawns_and_cancels_service_subactor(
raise
# await tractor.pause()
else:
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# await tractor.pause()
# await server.cancel_actor()
except RemoteActorError as rae:
@ -1199,7 +1197,7 @@ def test_peer_spawns_and_cancels_service_subactor(
# since we called `.cancel_actor()`, `.cancel_ack`
# will not be set on the ctx bc `ctx.cancel()` was not
# called directly fot this confext.
# called directly for this confext.
except ContextCancelled as ctxc:
_ctxc = ctxc
print(
@ -1239,12 +1237,19 @@ def test_peer_spawns_and_cancels_service_subactor(
# assert spawn_ctx.cancelled_caught
async def _main():
with trio.fail_after(
3 if not debug_mode
else 999
):
await main()
if raise_sub_spawn_error_after:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
trio.run(_main)
rae: RemoteActorError = excinfo.value
check_inner_rte(rae)
else:
trio.run(main)
trio.run(_main)

View File

@ -0,0 +1,239 @@
'''
Define the details of inter-actor "out-of-band" (OoB) cancel
semantics, that is how cancellation works when a cancel request comes
from the different concurrency (primitive's) "layer" then where the
eventual `trio.Task` actually raises a signal.
'''
from functools import partial
# from contextlib import asynccontextmanager as acm
# import itertools
import pytest
import trio
import tractor
from tractor import ( # typing
ActorNursery,
Portal,
Context,
# ContextCancelled,
# RemoteActorError,
)
# from tractor._testing import (
# tractor_test,
# expect_ctxc,
# )
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# def test_self_cancel():
# '''
# 2 cases:
# - calls `Actor.cancel()` locally in some task
# - calls LocalPortal.cancel_actor()` ?
#
# things to ensure!
# -[ ] the ctxc raised in a child should ideally show the tb of the
# underlying `Cancelled` checkpoint, i.e.
# `raise scope_error from ctxc`?
#
# -[ ] a self-cancelled context, if not allowed to block on
# `ctx.result()` at some point will hang since the `ctx._scope`
# is never `.cancel_called`; cases for this include,
# - an `open_ctx()` which never starteds before being OoB actor
# cancelled.
# |_ parent task will be blocked in `.open_context()` for the
# `Started` msg, and when the OoB ctxc arrives `ctx._scope`
# will never have been signalled..
# '''
# ...
# TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py`
# but with the `Lock.acquire()` from a `@context` to ensure the
# implicit ignore-case-non-unmasking.
#
# @tractor.context
# async def acquire_actor_global_lock(
# ctx: tractor.Context,
# ignore_special_cases: bool,
# ):
# async with maybe_unmask_excs(
# ignore_special_cases=ignore_special_cases,
# ):
# await ctx.started('locked')
# # block til cancelled
# await trio.sleep_forever()
@tractor.context
async def sleep_forever(
ctx: tractor.Context,
# ignore_special_cases: bool,
do_started: bool,
):
# async with maybe_unmask_excs(
# ignore_special_cases=ignore_special_cases,
# ):
# await ctx.started('locked')
if do_started:
await ctx.started()
# block til cancelled
print('sleepin on child-side..')
await trio.sleep_forever()
@pytest.mark.parametrize(
'cancel_ctx',
[True, False],
)
def test_cancel_ctx_with_parent_side_entered_in_bg_task(
debug_mode: bool,
loglevel: str,
cancel_ctx: bool,
):
'''
The most "basic" out-of-band-task self-cancellation case where
`Portal.open_context()` is entered in a bg task and the
parent-task (of the containing nursery) calls `Context.cancel()`
without the child knowing; the `Context._scope` should be
`.cancel_called` when the IPC ctx's child-side relays
a `ContextCancelled` with a `.canceller` set to the parent
actor('s task).
'''
async def main():
with trio.fail_after(
2 if not debug_mode else 999,
):
an: ActorNursery
async with (
tractor.open_nursery(
debug_mode=debug_mode,
loglevel='devx',
enable_stack_on_sig=True,
) as an,
trio.open_nursery() as tn,
):
ptl: Portal = await an.start_actor(
'sub',
enable_modules=[__name__],
)
async def _open_ctx_async(
do_started: bool = True,
task_status=trio.TASK_STATUS_IGNORED,
):
# do we expect to never enter the
# `.open_context()` below.
if not do_started:
task_status.started()
async with ptl.open_context(
sleep_forever,
do_started=do_started,
) as (ctx, first):
task_status.started(ctx)
await trio.sleep_forever()
# XXX, this is the key OoB part!
#
# - start the `.open_context()` in a bg task which
# blocks inside the embedded scope-body,
#
# - when we call `Context.cancel()` it **is
# not** from the same task which eventually runs
# `.__aexit__()`,
#
# - since the bg "opener" task will be in
# a `trio.sleep_forever()`, it must be interrupted
# by the `ContextCancelled` delivered from the
# child-side; `Context._scope: CancelScope` MUST
# be `.cancel_called`!
#
print('ASYNC opening IPC context in subtask..')
maybe_ctx: Context|None = await tn.start(partial(
_open_ctx_async,
))
if (
maybe_ctx
and
cancel_ctx
):
print('cancelling first IPC ctx!')
await maybe_ctx.cancel()
# XXX, note that despite `maybe_context.cancel()`
# being called above, it's the parent (bg) task
# which was originally never interrupted in
# the `ctx._scope` body due to missing case logic in
# `ctx._maybe_cancel_and_set_remote_error()`.
#
# It didn't matter that the subactor process was
# already terminated and reaped, nothing was
# cancelling the ctx-parent task's scope!
#
print('cancelling subactor!')
await ptl.cancel_actor()
if maybe_ctx:
try:
await maybe_ctx.wait_for_result()
except tractor.ContextCancelled as ctxc:
assert not cancel_ctx
assert (
ctxc.canceller
==
tractor.current_actor().aid.uid
)
# don't re-raise since it'll trigger
# an EG from the above tn.
if cancel_ctx:
# graceful self-cancel
trio.run(main)
else:
# ctx parent task should see OoB ctxc due to
# `ptl.cancel_actor()`.
with pytest.raises(tractor.ContextCancelled) as excinfo:
trio.run(main)
'root' in excinfo.value.canceller[0]
# def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it(
# debug_mode: bool,
# loglevel: str,
# ):
# '''
# Demos OoB cancellation from the perspective of a ctx opened with
# a child subactor where the parent cancels the child at the "actor
# layer" using `Portal.cancel_actor()` and thus the
# `ContextCancelled.canceller` received by the ctx's parent-side
# task will appear to be a "self cancellation" even though that
# specific task itself was not cancelled and thus
# `Context.cancel_called ==False`.
# '''
# TODO, do we have an existing implied ctx
# cancel test like this?
# with trio.move_on_after(0.5):# as cs:
# await _open_ctx_async(
# do_started=False,
# )
# in-line ctx scope should definitely raise
# a ctxc with `.canceller = 'root'`
# async with ptl.open_context(
# sleep_forever,
# do_started=True,
# ) as pair:

View File

@ -446,12 +446,12 @@ class ActorNursery:
@acm
async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor,
tb_hide: bool = False,
hide_tb: bool = True,
) -> typing.AsyncGenerator[ActorNursery, None]:
# normally don't need to show user by default
__tracebackhide__: bool = tb_hide
__tracebackhide__: bool = hide_tb
outer_err: BaseException|None = None
inner_err: BaseException|None = None

View File

@ -613,10 +613,9 @@ async def drain_to_final_msg(
# msg: dict = await ctx._rx_chan.receive()
# if res_cs.cancelled_caught:
#
# -[ ] make sure pause points work here for REPLing
# -[x] make sure pause points work here for REPLing
# the runtime itself; i.e. ensure there's no hangs!
# |_from tractor.devx.debug import pause
# await pause()
# |_see masked code below in .cancel_called path
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
@ -652,6 +651,10 @@ async def drain_to_final_msg(
f'IPC ctx cancelled externally during result drain ?\n'
f'{ctx}'
)
# XXX, for tracing `Cancelled`..
# from tractor.devx.debug import pause
# await pause(shield=True)
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
# the source cause of this local task's