Compare commits
No commits in common. "54658016f1a00135c08b5145e04d16f981f330f0" and "07781e38cd5dd0b60b071b5d9ca6f68457ff38da" have entirely different histories.
54658016f1
...
07781e38cd
|
@ -24,10 +24,14 @@ from tractor._testing import (
|
|||
)
|
||||
|
||||
# XXX TODO cases:
|
||||
# - [ ] peer cancelled itself - so other peers should
|
||||
# get errors reflecting that the peer was itself the .canceller?
|
||||
|
||||
# - [x] WE cancelled the peer and thus should not see any raised
|
||||
# `ContextCancelled` as it should be reaped silently?
|
||||
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||
# already covers this case?
|
||||
|
||||
# - [x] INTER-PEER: some arbitrary remote peer cancels via
|
||||
# Portal.cancel_actor().
|
||||
# => all other connected peers should get that cancel requesting peer's
|
||||
|
@ -40,6 +44,16 @@ from tractor._testing import (
|
|||
# that also spawned a remote task task in that same peer-parent.
|
||||
|
||||
|
||||
# def test_self_cancel():
|
||||
# '''
|
||||
# 2 cases:
|
||||
# - calls `Actor.cancel()` locally in some task
|
||||
# - calls LocalPortal.cancel_actor()` ?
|
||||
|
||||
# '''
|
||||
# ...
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def open_stream_then_sleep_forever(
|
||||
ctx: Context,
|
||||
|
@ -792,7 +806,7 @@ async def basic_echo_server(
|
|||
ctx: Context,
|
||||
peer_name: str = 'wittle_bruv',
|
||||
|
||||
err_after_imsg: int|None = None,
|
||||
err_after: int|None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -821,9 +835,8 @@ async def basic_echo_server(
|
|||
await ipc.send(resp)
|
||||
|
||||
if (
|
||||
err_after_imsg
|
||||
and
|
||||
i > err_after_imsg
|
||||
err_after
|
||||
and i > err_after
|
||||
):
|
||||
raise RuntimeError(
|
||||
f'Simulated error in `{peer_name}`'
|
||||
|
@ -965,8 +978,7 @@ async def tell_little_bro(
|
|||
actor_name: str,
|
||||
|
||||
caller: str = '',
|
||||
err_after: float|None = None,
|
||||
rng_seed: int = 50,
|
||||
err_after: int|None = None,
|
||||
):
|
||||
# contact target actor, do a stream dialog.
|
||||
async with (
|
||||
|
@ -977,18 +989,14 @@ async def tell_little_bro(
|
|||
basic_echo_server,
|
||||
|
||||
# XXX proxy any delayed err condition
|
||||
err_after_imsg=(
|
||||
err_after * rng_seed
|
||||
if err_after is not None
|
||||
else None
|
||||
),
|
||||
err_after=err_after,
|
||||
) as (sub_ctx, first),
|
||||
|
||||
sub_ctx.open_stream() as echo_ipc,
|
||||
):
|
||||
actor: Actor = current_actor()
|
||||
uid: tuple = actor.uid
|
||||
for i in range(rng_seed):
|
||||
for i in range(100):
|
||||
msg: tuple = (
|
||||
uid,
|
||||
i,
|
||||
|
@ -1013,13 +1021,13 @@ async def tell_little_bro(
|
|||
)
|
||||
@pytest.mark.parametrize(
|
||||
'raise_sub_spawn_error_after',
|
||||
[None, 0.5],
|
||||
[None, 50],
|
||||
)
|
||||
def test_peer_spawns_and_cancels_service_subactor(
|
||||
debug_mode: bool,
|
||||
raise_client_error: str,
|
||||
reg_addr: tuple[str, int],
|
||||
raise_sub_spawn_error_after: float|None,
|
||||
raise_sub_spawn_error_after: int|None,
|
||||
):
|
||||
# NOTE: this tests for the modden `mod wks open piker` bug
|
||||
# discovered as part of implementing workspace ctx
|
||||
|
@ -1033,7 +1041,6 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
# and the server's spawned child should cancel and terminate!
|
||||
peer_name: str = 'little_bro'
|
||||
|
||||
|
||||
def check_inner_rte(rae: RemoteActorError):
|
||||
'''
|
||||
Validate the little_bro's relayed inception!
|
||||
|
@ -1127,7 +1134,8 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
)
|
||||
|
||||
try:
|
||||
res = await client_ctx.wait_for_result(hide_tb=False)
|
||||
res = await client_ctx.result(hide_tb=False)
|
||||
|
||||
# in remote (relayed inception) error
|
||||
# case, we should error on the line above!
|
||||
if raise_sub_spawn_error_after:
|
||||
|
@ -1138,23 +1146,6 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
assert isinstance(res, ContextCancelled)
|
||||
assert client_ctx.cancel_acked
|
||||
assert res.canceller == root.uid
|
||||
assert not raise_sub_spawn_error_after
|
||||
|
||||
# cancelling the spawner sub should
|
||||
# transitively cancel it's sub, the little
|
||||
# bruv.
|
||||
print('root cancelling server/client sub-actors')
|
||||
await spawn_ctx.cancel()
|
||||
async with tractor.find_actor(
|
||||
name=peer_name,
|
||||
) as sub:
|
||||
assert not sub
|
||||
|
||||
# XXX, only for tracing
|
||||
# except BaseException as _berr:
|
||||
# berr = _berr
|
||||
# await tractor.pause(shield=True)
|
||||
# raise berr
|
||||
|
||||
except RemoteActorError as rae:
|
||||
_err = rae
|
||||
|
@ -1183,8 +1174,19 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
raise
|
||||
# await tractor.pause()
|
||||
|
||||
else:
|
||||
assert not raise_sub_spawn_error_after
|
||||
|
||||
# cancelling the spawner sub should
|
||||
# transitively cancel it's sub, the little
|
||||
# bruv.
|
||||
print('root cancelling server/client sub-actors')
|
||||
await spawn_ctx.cancel()
|
||||
async with tractor.find_actor(
|
||||
name=peer_name,
|
||||
) as sub:
|
||||
assert not sub
|
||||
|
||||
# await tractor.pause()
|
||||
# await server.cancel_actor()
|
||||
|
||||
except RemoteActorError as rae:
|
||||
|
@ -1197,7 +1199,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
|
||||
# since we called `.cancel_actor()`, `.cancel_ack`
|
||||
# will not be set on the ctx bc `ctx.cancel()` was not
|
||||
# called directly for this confext.
|
||||
# called directly fot this confext.
|
||||
except ContextCancelled as ctxc:
|
||||
_ctxc = ctxc
|
||||
print(
|
||||
|
@ -1237,19 +1239,12 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
|
||||
# assert spawn_ctx.cancelled_caught
|
||||
|
||||
async def _main():
|
||||
with trio.fail_after(
|
||||
3 if not debug_mode
|
||||
else 999
|
||||
):
|
||||
await main()
|
||||
|
||||
if raise_sub_spawn_error_after:
|
||||
with pytest.raises(RemoteActorError) as excinfo:
|
||||
trio.run(_main)
|
||||
trio.run(main)
|
||||
|
||||
rae: RemoteActorError = excinfo.value
|
||||
check_inner_rte(rae)
|
||||
|
||||
else:
|
||||
trio.run(_main)
|
||||
trio.run(main)
|
||||
|
|
|
@ -1,239 +0,0 @@
|
|||
'''
|
||||
Define the details of inter-actor "out-of-band" (OoB) cancel
|
||||
semantics, that is how cancellation works when a cancel request comes
|
||||
from the different concurrency (primitive's) "layer" then where the
|
||||
eventual `trio.Task` actually raises a signal.
|
||||
|
||||
'''
|
||||
from functools import partial
|
||||
# from contextlib import asynccontextmanager as acm
|
||||
# import itertools
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
import tractor
|
||||
from tractor import ( # typing
|
||||
ActorNursery,
|
||||
Portal,
|
||||
Context,
|
||||
# ContextCancelled,
|
||||
# RemoteActorError,
|
||||
)
|
||||
# from tractor._testing import (
|
||||
# tractor_test,
|
||||
# expect_ctxc,
|
||||
# )
|
||||
|
||||
# XXX TODO cases:
|
||||
# - [ ] peer cancelled itself - so other peers should
|
||||
# get errors reflecting that the peer was itself the .canceller?
|
||||
|
||||
# def test_self_cancel():
|
||||
# '''
|
||||
# 2 cases:
|
||||
# - calls `Actor.cancel()` locally in some task
|
||||
# - calls LocalPortal.cancel_actor()` ?
|
||||
#
|
||||
# things to ensure!
|
||||
# -[ ] the ctxc raised in a child should ideally show the tb of the
|
||||
# underlying `Cancelled` checkpoint, i.e.
|
||||
# `raise scope_error from ctxc`?
|
||||
#
|
||||
# -[ ] a self-cancelled context, if not allowed to block on
|
||||
# `ctx.result()` at some point will hang since the `ctx._scope`
|
||||
# is never `.cancel_called`; cases for this include,
|
||||
# - an `open_ctx()` which never starteds before being OoB actor
|
||||
# cancelled.
|
||||
# |_ parent task will be blocked in `.open_context()` for the
|
||||
# `Started` msg, and when the OoB ctxc arrives `ctx._scope`
|
||||
# will never have been signalled..
|
||||
|
||||
# '''
|
||||
# ...
|
||||
|
||||
# TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py`
|
||||
# but with the `Lock.acquire()` from a `@context` to ensure the
|
||||
# implicit ignore-case-non-unmasking.
|
||||
#
|
||||
# @tractor.context
|
||||
# async def acquire_actor_global_lock(
|
||||
# ctx: tractor.Context,
|
||||
# ignore_special_cases: bool,
|
||||
# ):
|
||||
|
||||
# async with maybe_unmask_excs(
|
||||
# ignore_special_cases=ignore_special_cases,
|
||||
# ):
|
||||
# await ctx.started('locked')
|
||||
|
||||
# # block til cancelled
|
||||
# await trio.sleep_forever()
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def sleep_forever(
|
||||
ctx: tractor.Context,
|
||||
# ignore_special_cases: bool,
|
||||
do_started: bool,
|
||||
):
|
||||
|
||||
# async with maybe_unmask_excs(
|
||||
# ignore_special_cases=ignore_special_cases,
|
||||
# ):
|
||||
# await ctx.started('locked')
|
||||
if do_started:
|
||||
await ctx.started()
|
||||
|
||||
# block til cancelled
|
||||
print('sleepin on child-side..')
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'cancel_ctx',
|
||||
[True, False],
|
||||
)
|
||||
def test_cancel_ctx_with_parent_side_entered_in_bg_task(
|
||||
debug_mode: bool,
|
||||
loglevel: str,
|
||||
cancel_ctx: bool,
|
||||
):
|
||||
'''
|
||||
The most "basic" out-of-band-task self-cancellation case where
|
||||
`Portal.open_context()` is entered in a bg task and the
|
||||
parent-task (of the containing nursery) calls `Context.cancel()`
|
||||
without the child knowing; the `Context._scope` should be
|
||||
`.cancel_called` when the IPC ctx's child-side relays
|
||||
a `ContextCancelled` with a `.canceller` set to the parent
|
||||
actor('s task).
|
||||
|
||||
'''
|
||||
async def main():
|
||||
with trio.fail_after(
|
||||
2 if not debug_mode else 999,
|
||||
):
|
||||
an: ActorNursery
|
||||
async with (
|
||||
tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
loglevel='devx',
|
||||
enable_stack_on_sig=True,
|
||||
) as an,
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
ptl: Portal = await an.start_actor(
|
||||
'sub',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
async def _open_ctx_async(
|
||||
do_started: bool = True,
|
||||
task_status=trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
# do we expect to never enter the
|
||||
# `.open_context()` below.
|
||||
if not do_started:
|
||||
task_status.started()
|
||||
|
||||
async with ptl.open_context(
|
||||
sleep_forever,
|
||||
do_started=do_started,
|
||||
) as (ctx, first):
|
||||
task_status.started(ctx)
|
||||
await trio.sleep_forever()
|
||||
|
||||
# XXX, this is the key OoB part!
|
||||
#
|
||||
# - start the `.open_context()` in a bg task which
|
||||
# blocks inside the embedded scope-body,
|
||||
#
|
||||
# - when we call `Context.cancel()` it **is
|
||||
# not** from the same task which eventually runs
|
||||
# `.__aexit__()`,
|
||||
#
|
||||
# - since the bg "opener" task will be in
|
||||
# a `trio.sleep_forever()`, it must be interrupted
|
||||
# by the `ContextCancelled` delivered from the
|
||||
# child-side; `Context._scope: CancelScope` MUST
|
||||
# be `.cancel_called`!
|
||||
#
|
||||
print('ASYNC opening IPC context in subtask..')
|
||||
maybe_ctx: Context|None = await tn.start(partial(
|
||||
_open_ctx_async,
|
||||
))
|
||||
|
||||
if (
|
||||
maybe_ctx
|
||||
and
|
||||
cancel_ctx
|
||||
):
|
||||
print('cancelling first IPC ctx!')
|
||||
await maybe_ctx.cancel()
|
||||
|
||||
# XXX, note that despite `maybe_context.cancel()`
|
||||
# being called above, it's the parent (bg) task
|
||||
# which was originally never interrupted in
|
||||
# the `ctx._scope` body due to missing case logic in
|
||||
# `ctx._maybe_cancel_and_set_remote_error()`.
|
||||
#
|
||||
# It didn't matter that the subactor process was
|
||||
# already terminated and reaped, nothing was
|
||||
# cancelling the ctx-parent task's scope!
|
||||
#
|
||||
print('cancelling subactor!')
|
||||
await ptl.cancel_actor()
|
||||
|
||||
if maybe_ctx:
|
||||
try:
|
||||
await maybe_ctx.wait_for_result()
|
||||
except tractor.ContextCancelled as ctxc:
|
||||
assert not cancel_ctx
|
||||
assert (
|
||||
ctxc.canceller
|
||||
==
|
||||
tractor.current_actor().aid.uid
|
||||
)
|
||||
# don't re-raise since it'll trigger
|
||||
# an EG from the above tn.
|
||||
|
||||
if cancel_ctx:
|
||||
# graceful self-cancel
|
||||
trio.run(main)
|
||||
|
||||
else:
|
||||
# ctx parent task should see OoB ctxc due to
|
||||
# `ptl.cancel_actor()`.
|
||||
with pytest.raises(tractor.ContextCancelled) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
'root' in excinfo.value.canceller[0]
|
||||
|
||||
|
||||
# def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it(
|
||||
# debug_mode: bool,
|
||||
# loglevel: str,
|
||||
# ):
|
||||
# '''
|
||||
# Demos OoB cancellation from the perspective of a ctx opened with
|
||||
# a child subactor where the parent cancels the child at the "actor
|
||||
# layer" using `Portal.cancel_actor()` and thus the
|
||||
# `ContextCancelled.canceller` received by the ctx's parent-side
|
||||
# task will appear to be a "self cancellation" even though that
|
||||
# specific task itself was not cancelled and thus
|
||||
# `Context.cancel_called ==False`.
|
||||
# '''
|
||||
# TODO, do we have an existing implied ctx
|
||||
# cancel test like this?
|
||||
# with trio.move_on_after(0.5):# as cs:
|
||||
# await _open_ctx_async(
|
||||
# do_started=False,
|
||||
# )
|
||||
|
||||
|
||||
# in-line ctx scope should definitely raise
|
||||
# a ctxc with `.canceller = 'root'`
|
||||
# async with ptl.open_context(
|
||||
# sleep_forever,
|
||||
# do_started=True,
|
||||
# ) as pair:
|
||||
|
|
@ -446,12 +446,12 @@ class ActorNursery:
|
|||
@acm
|
||||
async def _open_and_supervise_one_cancels_all_nursery(
|
||||
actor: Actor,
|
||||
hide_tb: bool = True,
|
||||
tb_hide: bool = False,
|
||||
|
||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||
|
||||
# normally don't need to show user by default
|
||||
__tracebackhide__: bool = hide_tb
|
||||
__tracebackhide__: bool = tb_hide
|
||||
|
||||
outer_err: BaseException|None = None
|
||||
inner_err: BaseException|None = None
|
||||
|
|
|
@ -613,9 +613,10 @@ async def drain_to_final_msg(
|
|||
# msg: dict = await ctx._rx_chan.receive()
|
||||
# if res_cs.cancelled_caught:
|
||||
#
|
||||
# -[x] make sure pause points work here for REPLing
|
||||
# -[ ] make sure pause points work here for REPLing
|
||||
# the runtime itself; i.e. ensure there's no hangs!
|
||||
# |_see masked code below in .cancel_called path
|
||||
# |_from tractor.devx.debug import pause
|
||||
# await pause()
|
||||
|
||||
# NOTE: we get here if the far end was
|
||||
# `ContextCancelled` in 2 cases:
|
||||
|
@ -651,10 +652,6 @@ async def drain_to_final_msg(
|
|||
f'IPC ctx cancelled externally during result drain ?\n'
|
||||
f'{ctx}'
|
||||
)
|
||||
# XXX, for tracing `Cancelled`..
|
||||
# from tractor.devx.debug import pause
|
||||
# await pause(shield=True)
|
||||
|
||||
# CASE 2: mask the local cancelled-error(s)
|
||||
# only when we are sure the remote error is
|
||||
# the source cause of this local task's
|
||||
|
|
Loading…
Reference in New Issue