forked from goodboy/tractor
Update ctx test suites to stricter semantics
Including mostly tweaking asserts on relayed `ContextCancelled`s and the new pub ctx properties: `.outcome`, `.maybe_error`, etc. as it pertains to graceful (absorbed) remote cancellation vs. loud ctxc cases expected to be raised by any `Portal.cancel_actor()` style teardown. Start checking a variety internals like `._remote/local_error`, `._is_self_cancelled()`, `._is_final_result_set()`, `._cancel_msg` where applicable. Also factor out the new `expect_ctxc()` checker to our `conftest.py` for use in other suites.modden_spawn_from_client_req
parent
c36deb1f4d
commit
2e797ef7ee
|
@ -1,6 +1,7 @@
|
||||||
"""
|
"""
|
||||||
``tractor`` testing!!
|
``tractor`` testing!!
|
||||||
"""
|
"""
|
||||||
|
from contextlib import asynccontextmanager as acm
|
||||||
import sys
|
import sys
|
||||||
import subprocess
|
import subprocess
|
||||||
import os
|
import os
|
||||||
|
@ -292,3 +293,26 @@ def daemon(
|
||||||
time.sleep(_PROC_SPAWN_WAIT)
|
time.sleep(_PROC_SPAWN_WAIT)
|
||||||
yield proc
|
yield proc
|
||||||
sig_prog(proc, _INT_SIGNAL)
|
sig_prog(proc, _INT_SIGNAL)
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def expect_ctxc(
|
||||||
|
yay: bool,
|
||||||
|
reraise: bool = False,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Small acm to catch `ContextCancelled` errors when expected
|
||||||
|
below it in a `async with ()` block.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if yay:
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
raise RuntimeError('Never raised ctxc?')
|
||||||
|
except tractor.ContextCancelled:
|
||||||
|
if reraise:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
yield
|
||||||
|
|
|
@ -5,7 +5,6 @@ Verify the we raise errors when streams are opened prior to
|
||||||
sync-opening a ``tractor.Context`` beforehand.
|
sync-opening a ``tractor.Context`` beforehand.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
|
||||||
from itertools import count
|
from itertools import count
|
||||||
import platform
|
import platform
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
|
@ -26,7 +25,10 @@ from tractor._exceptions import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
)
|
)
|
||||||
|
|
||||||
from conftest import tractor_test
|
from conftest import (
|
||||||
|
tractor_test,
|
||||||
|
expect_ctxc,
|
||||||
|
)
|
||||||
|
|
||||||
# ``Context`` semantics are as follows,
|
# ``Context`` semantics are as follows,
|
||||||
# ------------------------------------
|
# ------------------------------------
|
||||||
|
@ -194,12 +196,13 @@ def test_simple_context(
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with portal.open_context(
|
async with (
|
||||||
|
portal.open_context(
|
||||||
simple_setup_teardown,
|
simple_setup_teardown,
|
||||||
data=10,
|
data=10,
|
||||||
block_forever=callee_blocks_forever,
|
block_forever=callee_blocks_forever,
|
||||||
) as (ctx, sent):
|
) as (ctx, sent),
|
||||||
|
):
|
||||||
assert sent == 11
|
assert sent == 11
|
||||||
|
|
||||||
if callee_blocks_forever:
|
if callee_blocks_forever:
|
||||||
|
@ -250,17 +253,6 @@ def test_simple_context(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
@acm
|
|
||||||
async def expect_ctxc(yay: bool) -> None:
|
|
||||||
if yay:
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
except ContextCancelled:
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
yield
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'callee_returns_early',
|
'callee_returns_early',
|
||||||
[True, False],
|
[True, False],
|
||||||
|
@ -293,6 +285,7 @@ def test_caller_cancels(
|
||||||
) -> None:
|
) -> None:
|
||||||
actor: Actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
uid: tuple = actor.uid
|
uid: tuple = actor.uid
|
||||||
|
_ctxc: ContextCancelled|None = None
|
||||||
|
|
||||||
if (
|
if (
|
||||||
cancel_method == 'portal'
|
cancel_method == 'portal'
|
||||||
|
@ -303,6 +296,9 @@ def test_caller_cancels(
|
||||||
assert 0, 'Portal cancel should raise!'
|
assert 0, 'Portal cancel should raise!'
|
||||||
|
|
||||||
except ContextCancelled as ctxc:
|
except ContextCancelled as ctxc:
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# await tractor.pause()
|
||||||
|
_ctxc = ctxc
|
||||||
assert ctx.chan._cancel_called
|
assert ctx.chan._cancel_called
|
||||||
assert ctxc.canceller == uid
|
assert ctxc.canceller == uid
|
||||||
assert ctxc is ctx.maybe_error
|
assert ctxc is ctx.maybe_error
|
||||||
|
@ -311,7 +307,10 @@ def test_caller_cancels(
|
||||||
# case since self-cancellation should swallow the ctxc
|
# case since self-cancellation should swallow the ctxc
|
||||||
# silently!
|
# silently!
|
||||||
else:
|
else:
|
||||||
|
try:
|
||||||
res = await ctx.result()
|
res = await ctx.result()
|
||||||
|
except ContextCancelled as ctxc:
|
||||||
|
pytest.fail(f'should not have raised ctxc\n{ctxc}')
|
||||||
|
|
||||||
# we actually get a result
|
# we actually get a result
|
||||||
if callee_returns_early:
|
if callee_returns_early:
|
||||||
|
@ -342,6 +341,10 @@ def test_caller_cancels(
|
||||||
# await tractor.pause()
|
# await tractor.pause()
|
||||||
# assert ctx._local_error is None
|
# assert ctx._local_error is None
|
||||||
|
|
||||||
|
# TODO: don't need this right?
|
||||||
|
# if _ctxc:
|
||||||
|
# raise _ctxc
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
|
@ -352,11 +355,19 @@ def test_caller_cancels(
|
||||||
'simple_context',
|
'simple_context',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
timeout = 0.5 if not callee_returns_early else 2
|
timeout: float = (
|
||||||
|
0.5
|
||||||
|
if not callee_returns_early
|
||||||
|
else 2
|
||||||
|
)
|
||||||
with trio.fail_after(timeout):
|
with trio.fail_after(timeout):
|
||||||
async with (
|
async with (
|
||||||
|
expect_ctxc(
|
||||||
expect_ctxc(yay=cancel_method == 'portal'),
|
yay=(
|
||||||
|
not callee_returns_early
|
||||||
|
and cancel_method == 'portal'
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
portal.open_context(
|
portal.open_context(
|
||||||
simple_setup_teardown,
|
simple_setup_teardown,
|
||||||
|
@ -372,10 +383,18 @@ def test_caller_cancels(
|
||||||
await trio.sleep(0.5)
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
if cancel_method == 'ctx':
|
if cancel_method == 'ctx':
|
||||||
|
print('cancelling with `Context.cancel()`')
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
else:
|
|
||||||
|
elif cancel_method == 'portal':
|
||||||
|
print('cancelling with `Portal.cancel_actor()`')
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
else:
|
||||||
|
pytest.fail(
|
||||||
|
f'Unknown `cancel_method={cancel_method} ?'
|
||||||
|
)
|
||||||
|
|
||||||
if chk_ctx_result_before_exit:
|
if chk_ctx_result_before_exit:
|
||||||
await check_canceller(ctx)
|
await check_canceller(ctx)
|
||||||
|
|
||||||
|
@ -385,15 +404,22 @@ def test_caller_cancels(
|
||||||
if cancel_method != 'portal':
|
if cancel_method != 'portal':
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
# since the `.cancel_actor()` call just above
|
# XXX NOTE XXX: non-normal yet purposeful
|
||||||
# will cause the `.open_context().__aexit__()` raise
|
# test-specific ctxc suppression is implemented!
|
||||||
# a ctxc which should in turn cause `ctx._scope` to
|
#
|
||||||
|
# WHY: the `.cancel_actor()` case (cancel_method='portal')
|
||||||
|
# will cause both:
|
||||||
|
# * the `ctx.result()` inside `.open_context().__aexit__()`
|
||||||
|
# * AND the `ctx.result()` inside `check_canceller()`
|
||||||
|
# to raise ctxc.
|
||||||
|
#
|
||||||
|
# which should in turn cause `ctx._scope` to
|
||||||
# catch any cancellation?
|
# catch any cancellation?
|
||||||
if (
|
if (
|
||||||
not callee_returns_early
|
not callee_returns_early
|
||||||
and cancel_method == 'portal'
|
and cancel_method != 'portal'
|
||||||
):
|
):
|
||||||
assert ctx._scope.cancelled_caught
|
assert not ctx._scope.cancelled_caught
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
@ -511,6 +537,23 @@ async def expect_cancelled(
|
||||||
await stream.send(msg) # echo server
|
await stream.send(msg) # echo server
|
||||||
|
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
|
|
||||||
|
# on ctx.cancel() the internal RPC scope is cancelled but
|
||||||
|
# never caught until the func exits.
|
||||||
|
assert ctx._scope.cancel_called
|
||||||
|
assert not ctx._scope.cancelled_caught
|
||||||
|
|
||||||
|
# should be the RPC cmd request for `._cancel_task()`
|
||||||
|
assert ctx._cancel_msg
|
||||||
|
# which, has not yet resolved to an error outcome
|
||||||
|
# since this rpc func has not yet exited.
|
||||||
|
assert not ctx.maybe_error
|
||||||
|
assert not ctx._final_result_is_set()
|
||||||
|
|
||||||
|
# debug REPL if needed
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# await tractor.pause()
|
||||||
|
|
||||||
# expected case
|
# expected case
|
||||||
_state = False
|
_state = False
|
||||||
raise
|
raise
|
||||||
|
@ -594,16 +637,16 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
||||||
with trio.fail_after(0.2):
|
with trio.fail_after(0.2):
|
||||||
await ctx.result()
|
await ctx.result()
|
||||||
assert 0, "Callee should have blocked!?"
|
assert 0, "Callee should have blocked!?"
|
||||||
|
|
||||||
except trio.TooSlowError:
|
except trio.TooSlowError:
|
||||||
# NO-OP -> since already called above
|
# NO-OP -> since already called above
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
# NOTE: local scope should have absorbed the cancellation since
|
# NOTE: local scope should have absorbed the cancellation since
|
||||||
# in this case we call `ctx.cancel()` and the local
|
# in this case we call `ctx.cancel()` and the local
|
||||||
# `._scope` gets `.cancel_called` on the ctxc ack.
|
# `._scope` does not get `.cancel_called` and thus
|
||||||
|
# `.cancelled_caught` neither will ever bet set.
|
||||||
if use_ctx_cancel_method:
|
if use_ctx_cancel_method:
|
||||||
assert ctx._scope.cancelled_caught
|
assert not ctx._scope.cancelled_caught
|
||||||
|
|
||||||
# rxed ctxc response from far end
|
# rxed ctxc response from far end
|
||||||
assert ctx.cancel_acked
|
assert ctx.cancel_acked
|
||||||
|
|
|
@ -238,7 +238,12 @@ async def stream_from_peer(
|
||||||
|
|
||||||
assert peer_ctx._remote_error is ctxerr
|
assert peer_ctx._remote_error is ctxerr
|
||||||
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
|
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
|
||||||
assert peer_ctx.canceller == ctxerr.canceller
|
|
||||||
|
# the peer ctx is the canceller even though it's canceller
|
||||||
|
# is the "canceller" XD
|
||||||
|
assert peer_name in peer_ctx.canceller
|
||||||
|
|
||||||
|
assert "canceller" in ctxerr.canceller
|
||||||
|
|
||||||
# caller peer should not be the cancel requester
|
# caller peer should not be the cancel requester
|
||||||
assert not ctx.cancel_called
|
assert not ctx.cancel_called
|
||||||
|
@ -272,7 +277,6 @@ async def stream_from_peer(
|
||||||
|
|
||||||
# root/parent actor task should NEVER HAVE cancelled us!
|
# root/parent actor task should NEVER HAVE cancelled us!
|
||||||
assert not ctx.canceller
|
assert not ctx.canceller
|
||||||
assert 'canceller' in peer_ctx.canceller
|
|
||||||
|
|
||||||
raise
|
raise
|
||||||
# TODO: IN THEORY we could have other cases depending on
|
# TODO: IN THEORY we could have other cases depending on
|
||||||
|
@ -527,27 +531,24 @@ def test_peer_canceller(
|
||||||
|
|
||||||
assert ctx.cancel_called
|
assert ctx.cancel_called
|
||||||
|
|
||||||
if (
|
if ctx is sleeper_ctx:
|
||||||
ctx is sleeper_ctx
|
assert 'canceller' in re.canceller
|
||||||
or ctx is caller_ctx
|
assert 'sleeper' in ctx.canceller
|
||||||
):
|
|
||||||
assert (
|
|
||||||
re.canceller
|
|
||||||
==
|
|
||||||
ctx.canceller
|
|
||||||
==
|
|
||||||
canceller.channel.uid
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
if ctx is canceller_ctx:
|
||||||
assert (
|
assert (
|
||||||
re.canceller
|
re.canceller
|
||||||
==
|
==
|
||||||
ctx.canceller
|
|
||||||
==
|
|
||||||
root.uid
|
root.uid
|
||||||
)
|
)
|
||||||
|
|
||||||
|
else: # the other 2 ctxs
|
||||||
|
assert (
|
||||||
|
re.canceller
|
||||||
|
==
|
||||||
|
canceller.channel.uid
|
||||||
|
)
|
||||||
|
|
||||||
# since the sleeper errors while handling a
|
# since the sleeper errors while handling a
|
||||||
# peer-cancelled (by ctxc) scenario, we expect
|
# peer-cancelled (by ctxc) scenario, we expect
|
||||||
# that the `.open_context()` block DOES call
|
# that the `.open_context()` block DOES call
|
||||||
|
@ -576,14 +577,16 @@ def test_peer_canceller(
|
||||||
assert not sleeper_ctx._scope.cancelled_caught
|
assert not sleeper_ctx._scope.cancelled_caught
|
||||||
|
|
||||||
assert isinstance(loc_err, ContextCancelled)
|
assert isinstance(loc_err, ContextCancelled)
|
||||||
assert loc_err.canceller == sleeper_ctx.canceller
|
|
||||||
assert (
|
# the received remote error's `.canceller`
|
||||||
loc_err.canceller[0]
|
# will of course be the "canceller" actor BUT
|
||||||
==
|
# the canceller set on the local handle to
|
||||||
sleeper_ctx.canceller[0]
|
# `sleeper_ctx` will be the "sleeper" uid
|
||||||
==
|
# since it's the actor that relayed us the
|
||||||
'canceller'
|
# error which was **caused** by the
|
||||||
)
|
# "canceller".
|
||||||
|
assert 'sleeper' in sleeper_ctx.canceller
|
||||||
|
assert 'canceller' == loc_err.canceller[0]
|
||||||
|
|
||||||
# the sleeper's remote error is the error bubbled
|
# the sleeper's remote error is the error bubbled
|
||||||
# out of the context-stack above!
|
# out of the context-stack above!
|
||||||
|
|
Loading…
Reference in New Issue