forked from goodboy/tractor
Add a first serious inter-peer remote cancel suite
Tests that appropriate `Context` exit state, the relay of a `ContextCancelled` error and its `.canceller: tuple[str, str]` value are set when an inter-peer cancellation happens via an "out of band" request method (in this case using `Portal.cancel_actor()` and that cancellation is propagated "horizontally" to other peers. Verify that any such cancellation scenario which also experiences an "error during `ContextCancelled` handling" DOES NOT result in that further error being suppressed and that the user's exception bubbles out of the `Context.open_context()` block(s) appropriately! Likely more tests to come as well as some factoring of the teardown state checks where possible. Pertains to serious testing the major work landing in #357multihomed
parent
87c1113de4
commit
ca3f7a1b6b
|
@ -3,30 +3,32 @@ Codify the cancellation request semantics in terms
|
||||||
of one remote actor cancelling another.
|
of one remote actor cancelling another.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
# from contextlib import asynccontextmanager as acm
|
||||||
|
import itertools
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor._exceptions import (
|
from tractor import ( # typing
|
||||||
StreamOverrun,
|
Portal,
|
||||||
|
Context,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_self_cancel():
|
# def test_self_cancel():
|
||||||
'''
|
# '''
|
||||||
2 cases:
|
# 2 cases:
|
||||||
- calls `Actor.cancel()` locally in some task
|
# - calls `Actor.cancel()` locally in some task
|
||||||
- calls LocalPortal.cancel_actor()` ?
|
# - calls LocalPortal.cancel_actor()` ?
|
||||||
|
|
||||||
'''
|
# '''
|
||||||
...
|
# ...
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def sleep_forever(
|
async def sleep_forever(
|
||||||
ctx: tractor.Context,
|
ctx: Context,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Sync the context, open a stream then just sleep.
|
Sync the context, open a stream then just sleep.
|
||||||
|
@ -37,13 +39,19 @@ async def sleep_forever(
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@tractor.context
|
||||||
async def attach_to_sleep_forever():
|
async def error_before_started(
|
||||||
|
ctx: Context,
|
||||||
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Cancel a context **before** any underlying error is raised in order
|
This simulates exactly an original bug discovered in:
|
||||||
to trigger a local reception of a ``ContextCancelled`` which **should not**
|
https://github.com/pikers/piker/issues/244
|
||||||
be re-raised in the local surrounding ``Context`` *iff* the cancel was
|
|
||||||
requested by **this** side of the context.
|
Cancel a context **before** any underlying error is raised so
|
||||||
|
as to trigger a local reception of a ``ContextCancelled`` which
|
||||||
|
SHOULD NOT be re-raised in the local surrounding ``Context``
|
||||||
|
*iff* the cancel was requested by **this** (callee) side of
|
||||||
|
the context.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with tractor.wait_for_actor('sleeper') as p2:
|
async with tractor.wait_for_actor('sleeper') as p2:
|
||||||
|
@ -51,8 +59,16 @@ async def attach_to_sleep_forever():
|
||||||
p2.open_context(sleep_forever) as (peer_ctx, first),
|
p2.open_context(sleep_forever) as (peer_ctx, first),
|
||||||
peer_ctx.open_stream(),
|
peer_ctx.open_stream(),
|
||||||
):
|
):
|
||||||
|
# NOTE: this WAS inside an @acm body but i factored it
|
||||||
|
# out and just put it inline here since i don't think
|
||||||
|
# the mngr part really matters, though maybe it could?
|
||||||
try:
|
try:
|
||||||
yield
|
# XXX NOTE XXX: THIS sends an UNSERIALIZABLE TYPE which
|
||||||
|
# should raise a `TypeError` and **NOT BE SWALLOWED** by
|
||||||
|
# the surrounding try/finally (normally inside the
|
||||||
|
# body of some acm)..
|
||||||
|
await ctx.started(object())
|
||||||
|
# yield
|
||||||
finally:
|
finally:
|
||||||
# XXX: previously this would trigger local
|
# XXX: previously this would trigger local
|
||||||
# ``ContextCancelled`` to be received and raised in the
|
# ``ContextCancelled`` to be received and raised in the
|
||||||
|
@ -71,23 +87,6 @@ async def attach_to_sleep_forever():
|
||||||
await peer_ctx.cancel()
|
await peer_ctx.cancel()
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
|
||||||
async def error_before_started(
|
|
||||||
ctx: tractor.Context,
|
|
||||||
) -> None:
|
|
||||||
'''
|
|
||||||
This simulates exactly an original bug discovered in:
|
|
||||||
https://github.com/pikers/piker/issues/244
|
|
||||||
|
|
||||||
'''
|
|
||||||
async with attach_to_sleep_forever():
|
|
||||||
|
|
||||||
# XXX NOTE XXX: THIS sends an UNSERIALIZABLE TYPE which
|
|
||||||
# should raise a `TypeError` and **NOT BE SWALLOWED** by
|
|
||||||
# the surrounding acm!!?!
|
|
||||||
await ctx.started(object())
|
|
||||||
|
|
||||||
|
|
||||||
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
||||||
'''
|
'''
|
||||||
Verify that an error raised in a remote context which itself
|
Verify that an error raised in a remote context which itself
|
||||||
|
@ -121,93 +120,332 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def sleep_a_bit_then_cancel_sleeper(
|
async def sleep_a_bit_then_cancel_peer(
|
||||||
ctx: tractor.Context,
|
ctx: Context,
|
||||||
|
peer_name: str = 'sleeper',
|
||||||
|
cancel_after: float = .5,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
async with tractor.wait_for_actor('sleeper') as sleeper:
|
|
||||||
await ctx.started()
|
|
||||||
# await trio.sleep_forever()
|
|
||||||
await trio.sleep(3)
|
|
||||||
# async with tractor.wait_for_actor('sleeper') as sleeper:
|
|
||||||
await sleeper.cancel_actor()
|
|
||||||
|
|
||||||
|
|
||||||
def test_peer_canceller():
|
|
||||||
'''
|
'''
|
||||||
Verify that a cancellation triggered by a peer (whether in tree
|
Connect to peer, sleep as per input delay, cancel the peer.
|
||||||
or not) results in a cancelled error with
|
|
||||||
a `ContextCancelled.errorer` matching the requesting actor.
|
|
||||||
|
|
||||||
cases:
|
'''
|
||||||
- some arbitrary remote peer cancels via Portal.cancel_actor().
|
peer: Portal
|
||||||
=> all other connected peers should get that cancel requesting peer's
|
async with tractor.wait_for_actor(peer_name) as peer:
|
||||||
uid in the ctx-cancelled error msg.
|
await ctx.started()
|
||||||
|
await trio.sleep(cancel_after)
|
||||||
|
await peer.cancel_actor()
|
||||||
|
|
||||||
- peer spawned a sub-actor which (also) spawned a failing task
|
|
||||||
which was unhandled and propagated up to the immediate
|
|
||||||
parent, the peer to the actor that also spawned a remote task
|
|
||||||
task in that same peer-parent.
|
|
||||||
|
|
||||||
- peer cancelled itself - so other peers should
|
@tractor.context
|
||||||
get errors reflecting that the peer was itself the .canceller?
|
async def stream_ints(
|
||||||
|
ctx: Context,
|
||||||
|
):
|
||||||
|
await ctx.started()
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
for i in itertools.count():
|
||||||
|
await stream.send(i)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def stream_from_peer(
|
||||||
|
ctx: Context,
|
||||||
|
peer_name: str = 'sleeper',
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
peer: Portal
|
||||||
|
try:
|
||||||
|
async with (
|
||||||
|
tractor.wait_for_actor(peer_name) as peer,
|
||||||
|
peer.open_context(stream_ints) as (peer_ctx, first),
|
||||||
|
peer_ctx.open_stream() as stream,
|
||||||
|
):
|
||||||
|
await ctx.started()
|
||||||
|
# XXX TODO: big set of questions for this
|
||||||
|
# - should we raise `ContextCancelled` or `Cancelled` (rn
|
||||||
|
# it does that) here?!
|
||||||
|
# - test the `ContextCancelled` OUTSIDE the
|
||||||
|
# `.open_context()` call?
|
||||||
|
try:
|
||||||
|
async for msg in stream:
|
||||||
|
print(msg)
|
||||||
|
|
||||||
|
except trio.Cancelled:
|
||||||
|
assert not ctx.cancel_called
|
||||||
|
assert not ctx.cancelled_caught
|
||||||
|
|
||||||
|
assert not peer_ctx.cancel_called
|
||||||
|
assert not peer_ctx.cancelled_caught
|
||||||
|
|
||||||
|
assert 'root' in ctx.cancel_called_remote
|
||||||
|
|
||||||
|
raise # XXX MUST NEVER MASK IT!!
|
||||||
|
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await tractor.pause()
|
||||||
|
# pass
|
||||||
|
# pytest.fail(
|
||||||
|
raise RuntimeError(
|
||||||
|
'peer never triggered local `[Context]Cancelled`?!?'
|
||||||
|
)
|
||||||
|
|
||||||
|
# NOTE: cancellation of the (sleeper) peer should always
|
||||||
|
# cause a `ContextCancelled` raise in this streaming
|
||||||
|
# actor.
|
||||||
|
except ContextCancelled as ctxerr:
|
||||||
|
assert ctxerr.canceller == 'canceller'
|
||||||
|
assert ctxerr._remote_error is ctxerr
|
||||||
|
|
||||||
|
# CASE 1: we were cancelled by our parent, the root actor.
|
||||||
|
# TODO: there are other cases depending on how the root
|
||||||
|
# actor and it's caller side task are written:
|
||||||
|
# - if the root does not req us to cancel then an
|
||||||
|
# IPC-transport related error should bubble from the async
|
||||||
|
# for loop and thus cause local cancellation both here
|
||||||
|
# and in the root (since in that case this task cancels the
|
||||||
|
# context with the root, not the other way around)
|
||||||
|
assert ctx.cancel_called_remote[0] == 'root'
|
||||||
|
raise
|
||||||
|
|
||||||
|
# except BaseException as err:
|
||||||
|
|
||||||
|
# raise
|
||||||
|
|
||||||
|
# cases:
|
||||||
|
# - some arbitrary remote peer cancels via Portal.cancel_actor().
|
||||||
|
# => all other connected peers should get that cancel requesting peer's
|
||||||
|
# uid in the ctx-cancelled error msg.
|
||||||
|
|
||||||
|
# - peer spawned a sub-actor which (also) spawned a failing task
|
||||||
|
# which was unhandled and propagated up to the immediate
|
||||||
|
# parent, the peer to the actor that also spawned a remote task
|
||||||
|
# task in that same peer-parent.
|
||||||
|
|
||||||
|
# - peer cancelled itself - so other peers should
|
||||||
|
# get errors reflecting that the peer was itself the .canceller?
|
||||||
|
|
||||||
|
# - WE cancelled the peer and thus should not see any raised
|
||||||
|
# `ContextCancelled` as it should be reaped silently?
|
||||||
|
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||||
|
# already covers this case?
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'error_during_ctxerr_handling',
|
||||||
|
[False, True],
|
||||||
|
)
|
||||||
|
def test_peer_canceller(
|
||||||
|
error_during_ctxerr_handling: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify that a cancellation triggered by an in-actor-tree peer
|
||||||
|
results in a cancelled errors with all other actors which have
|
||||||
|
opened contexts to that same actor.
|
||||||
|
|
||||||
|
legend:
|
||||||
|
name>
|
||||||
|
a "play button" that indicates a new runtime instance,
|
||||||
|
an individual actor with `name`.
|
||||||
|
|
||||||
|
.subname>
|
||||||
|
a subactor who's parent should be on some previous
|
||||||
|
line and be less indented.
|
||||||
|
|
||||||
|
.actor0> ()-> .actor1>
|
||||||
|
a inter-actor task context opened (by `async with `Portal.open_context()`)
|
||||||
|
from actor0 *into* actor1.
|
||||||
|
|
||||||
|
.actor0> ()<=> .actor1>
|
||||||
|
a inter-actor task context opened (as above)
|
||||||
|
from actor0 *into* actor1 which INCLUDES an additional
|
||||||
|
stream open using `async with Context.open_stream()`.
|
||||||
|
|
||||||
|
|
||||||
|
------ - ------
|
||||||
|
supervision view
|
||||||
|
------ - ------
|
||||||
|
root>
|
||||||
|
.sleeper> TODO: SOME SYNTAX SHOWING JUST SLEEPING
|
||||||
|
.just_caller> ()=> .sleeper>
|
||||||
|
.canceller> ()-> .sleeper>
|
||||||
|
TODO: how define calling `Portal.cancel_actor()`
|
||||||
|
|
||||||
|
In this case a `ContextCancelled` with `.errorer` set to the
|
||||||
|
requesting actor, in this case 'canceller', should be relayed
|
||||||
|
to all other actors who have also opened a (remote task)
|
||||||
|
context with that now cancelled actor.
|
||||||
|
|
||||||
|
------ - ------
|
||||||
|
task view
|
||||||
|
------ - ------
|
||||||
|
So there are 5 context open in total with 3 from the root to
|
||||||
|
its children and 2 from children to their peers:
|
||||||
|
1. root> ()-> .sleeper>
|
||||||
|
2. root> ()-> .streamer>
|
||||||
|
3. root> ()-> .canceller>
|
||||||
|
|
||||||
|
4. .streamer> ()<=> .sleep>
|
||||||
|
5. .canceller> ()-> .sleeper>
|
||||||
|
- calls `Portal.cancel_actor()`
|
||||||
|
|
||||||
- WE cancelled the peer and thus should not see any raised
|
|
||||||
`ContextCancelled` as it should be reaped silently?
|
|
||||||
=> pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
|
||||||
already covers this case?
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as an:
|
||||||
canceller: tractor.Portal = await n.start_actor(
|
canceller: Portal = await an.start_actor(
|
||||||
'canceller',
|
'canceller',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
sleeper: tractor.Portal = await n.start_actor(
|
sleeper: Portal = await an.start_actor(
|
||||||
'sleeper',
|
'sleeper',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
just_caller: Portal = await an.start_actor(
|
||||||
|
'just_caller', # but i just met her?
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
async with (
|
try:
|
||||||
sleeper.open_context(
|
async with (
|
||||||
sleep_forever,
|
sleeper.open_context(
|
||||||
) as (sleeper_ctx, sent),
|
sleep_forever,
|
||||||
|
) as (sleeper_ctx, sent),
|
||||||
|
|
||||||
canceller.open_context(
|
just_caller.open_context(
|
||||||
sleep_a_bit_then_cancel_sleeper,
|
stream_from_peer,
|
||||||
) as (canceller_ctx, sent),
|
) as (caller_ctx, sent),
|
||||||
):
|
|
||||||
# await tractor.pause()
|
|
||||||
try:
|
|
||||||
print('PRE CONTEXT RESULT')
|
|
||||||
await sleeper_ctx.result()
|
|
||||||
|
|
||||||
# TODO: not sure why this isn't catching
|
canceller.open_context(
|
||||||
# but maybe we need an `ExceptionGroup` and
|
sleep_a_bit_then_cancel_peer,
|
||||||
# the whole except *errs: thinger in 3.11?
|
) as (canceller_ctx, sent),
|
||||||
except (
|
|
||||||
ContextCancelled,
|
|
||||||
) as berr:
|
|
||||||
print('CAUGHT REMOTE CONTEXT CANCEL')
|
|
||||||
|
|
||||||
# canceller should not have been remotely
|
):
|
||||||
# cancelled.
|
ctxs: list[Context] = [
|
||||||
assert canceller_ctx.cancel_called_remote is None
|
sleeper_ctx,
|
||||||
|
caller_ctx,
|
||||||
|
canceller_ctx,
|
||||||
|
]
|
||||||
|
|
||||||
# NOTE: will only enter if you wrap in
|
try:
|
||||||
# a shielded cs..
|
print('PRE CONTEXT RESULT')
|
||||||
# await tractor.pause() # TODO: shield=True)
|
await sleeper_ctx.result()
|
||||||
|
|
||||||
assert sleeper_ctx.canceller == 'canceller'
|
# should never get here
|
||||||
assert not sleep_ctx.cancelled_caught
|
pytest.fail(
|
||||||
|
'Context.result() did not raise ctx-cancelled?'
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: not sure why this isn't catching
|
||||||
|
# but maybe we need an `ExceptionGroup` and
|
||||||
|
# the whole except *errs: thinger in 3.11?
|
||||||
|
except ContextCancelled as ctxerr:
|
||||||
|
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
|
||||||
|
|
||||||
|
# canceller and caller peers should not
|
||||||
|
# have been remotely cancelled.
|
||||||
|
assert canceller_ctx.cancel_called_remote is None
|
||||||
|
assert caller_ctx.cancel_called_remote is None
|
||||||
|
|
||||||
|
assert ctxerr.canceller[0] == 'canceller'
|
||||||
|
|
||||||
|
# XXX NOTE XXX: since THIS `ContextCancelled`
|
||||||
|
# HAS NOT YET bubbled up to the
|
||||||
|
# `sleeper.open_context().__aexit__()` this
|
||||||
|
# value is not yet set, however outside this
|
||||||
|
# block it should be.
|
||||||
|
assert not sleeper_ctx.cancelled_caught
|
||||||
|
|
||||||
|
# TODO: a test which ensures this error is
|
||||||
|
# bubbled and caught (NOT MASKED) by the
|
||||||
|
# runtime!!!
|
||||||
|
if error_during_ctxerr_handling:
|
||||||
|
raise RuntimeError('Simulated error during teardown')
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
|
# SHOULD NEVER GET HERE!
|
||||||
|
except BaseException:
|
||||||
|
pytest.fail('did not rx ctx-cancelled error?')
|
||||||
|
else:
|
||||||
|
pytest.fail('did not rx ctx-cancelled error?')
|
||||||
|
|
||||||
|
except (
|
||||||
|
ContextCancelled,
|
||||||
|
RuntimeError,
|
||||||
|
)as ctxerr:
|
||||||
|
_err = ctxerr
|
||||||
|
|
||||||
|
if error_during_ctxerr_handling:
|
||||||
|
assert isinstance(ctxerr, RuntimeError)
|
||||||
|
|
||||||
|
# NOTE: this root actor task should have
|
||||||
|
# called `Context.cancel()` on the
|
||||||
|
# `.__aexit__()` to every opened ctx.
|
||||||
|
for ctx in ctxs:
|
||||||
|
assert ctx.cancel_called
|
||||||
|
|
||||||
|
# each context should have received
|
||||||
|
# a silently absorbed context cancellation
|
||||||
|
# from its peer actor's task.
|
||||||
|
assert ctx.chan.uid == ctx.cancel_called_remote
|
||||||
|
|
||||||
|
# this root actor task should have
|
||||||
|
# cancelled all opened contexts except
|
||||||
|
# the sleeper which is cancelled by its
|
||||||
|
# peer "canceller"
|
||||||
|
if ctx is not sleeper_ctx:
|
||||||
|
assert ctx._remote_error.canceller[0] == 'root'
|
||||||
|
|
||||||
raise
|
|
||||||
else:
|
else:
|
||||||
raise RuntimeError('NEVER RXED EXPECTED `ContextCancelled`')
|
assert ctxerr.canceller[0] == 'canceller'
|
||||||
|
|
||||||
|
# the sleeper's remote error is the error bubbled
|
||||||
|
# out of the context-stack above!
|
||||||
|
re = sleeper_ctx._remote_error
|
||||||
|
assert re is ctxerr
|
||||||
|
|
||||||
with pytest.raises(tractor.ContextCancelled) as excinfo:
|
for ctx in ctxs:
|
||||||
trio.run(main)
|
|
||||||
|
|
||||||
assert excinfo.value.type == ContextCancelled
|
if ctx is sleeper_ctx:
|
||||||
|
assert not ctx.cancel_called
|
||||||
|
assert ctx.cancelled_caught
|
||||||
|
else:
|
||||||
|
assert ctx.cancel_called
|
||||||
|
assert not ctx.cancelled_caught
|
||||||
|
|
||||||
|
# each context should have received
|
||||||
|
# a silently absorbed context cancellation
|
||||||
|
# from its peer actor's task.
|
||||||
|
assert ctx.chan.uid == ctx.cancel_called_remote
|
||||||
|
|
||||||
|
# NOTE: when an inter-peer cancellation
|
||||||
|
# occurred, we DO NOT expect this
|
||||||
|
# root-actor-task to have requested a cancel of
|
||||||
|
# the context since cancellation was caused by
|
||||||
|
# the "canceller" peer and thus
|
||||||
|
# `Context.cancel()` SHOULD NOT have been
|
||||||
|
# called inside
|
||||||
|
# `Portal.open_context().__aexit__()`.
|
||||||
|
assert not sleeper_ctx.cancel_called
|
||||||
|
|
||||||
|
# XXX NOTE XXX: and see matching comment above but,
|
||||||
|
# this flag is set only AFTER the `.open_context()`
|
||||||
|
# has exited and should be set in both outcomes
|
||||||
|
# including the case where ctx-cancel handling
|
||||||
|
# itself errors.
|
||||||
|
assert sleeper_ctx.cancelled_caught
|
||||||
|
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
|
||||||
|
|
||||||
|
# await tractor.pause()
|
||||||
|
raise # always to ensure teardown
|
||||||
|
|
||||||
|
if error_during_ctxerr_handling:
|
||||||
|
with pytest.raises(RuntimeError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
else:
|
||||||
|
|
||||||
|
with pytest.raises(ContextCancelled) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
assert excinfo.value.type == ContextCancelled
|
||||||
|
assert excinfo.value.canceller[0] == 'canceller'
|
||||||
|
|
Loading…
Reference in New Issue