Add detailed ``@tractor.context`` cancellation/termination tests
parent
ec8743ebc3
commit
db1e3dc353
|
@ -6,20 +6,13 @@ import pytest
|
|||
import trio
|
||||
import tractor
|
||||
|
||||
# from conftest import tractor_test
|
||||
from conftest import tractor_test
|
||||
|
||||
# TODO: test endofchannel semantics / cancellation / error cases:
|
||||
# 3 possible outcomes:
|
||||
# - normal termination: far end relays a stop message with
|
||||
# final value as in async gen from ``return <val>``.
|
||||
|
||||
# possible outcomes:
|
||||
# - normal termination: far end returns
|
||||
# - premature close: far end relays a stop message to tear down stream
|
||||
# - cancel: far end raises `ContextCancelled`
|
||||
|
||||
# future possible outcomes
|
||||
# - restart request: far end raises `ContextRestart`
|
||||
# the general stream semantics are
|
||||
# - normal termination: far end relays a stop message which
|
||||
# terminates an ongoing ``MsgStream`` iteration
|
||||
# - cancel termination: context is cancelled on either side cancelling
|
||||
# the "linked" inter-actor task context
|
||||
|
||||
|
||||
_state: bool = False
|
||||
|
@ -30,6 +23,7 @@ async def simple_setup_teardown(
|
|||
|
||||
ctx: tractor.Context,
|
||||
data: int,
|
||||
block_forever: bool = False,
|
||||
|
||||
) -> None:
|
||||
|
||||
|
@ -41,8 +35,11 @@ async def simple_setup_teardown(
|
|||
await ctx.started(data + 1)
|
||||
|
||||
try:
|
||||
# block until cancelled
|
||||
await trio.sleep_forever()
|
||||
if block_forever:
|
||||
# block until cancelled
|
||||
await trio.sleep_forever()
|
||||
else:
|
||||
return 'yo'
|
||||
finally:
|
||||
_state = False
|
||||
|
||||
|
@ -56,7 +53,14 @@ async def assert_state(value: bool):
|
|||
'error_parent',
|
||||
[False, True],
|
||||
)
|
||||
def test_simple_context(error_parent):
|
||||
@pytest.mark.parametrize(
|
||||
'callee_blocks_forever',
|
||||
[False, True],
|
||||
)
|
||||
def test_simple_context(
|
||||
error_parent,
|
||||
callee_blocks_forever,
|
||||
):
|
||||
|
||||
async def main():
|
||||
|
||||
|
@ -70,11 +74,16 @@ def test_simple_context(error_parent):
|
|||
async with portal.open_context(
|
||||
simple_setup_teardown,
|
||||
data=10,
|
||||
block_forever=callee_blocks_forever,
|
||||
) as (ctx, sent):
|
||||
|
||||
assert sent == 11
|
||||
|
||||
await portal.run(assert_state, value=True)
|
||||
if callee_blocks_forever:
|
||||
await portal.run(assert_state, value=True)
|
||||
await ctx.cancel()
|
||||
else:
|
||||
assert await ctx.result() == 'yo'
|
||||
|
||||
# after cancellation
|
||||
await portal.run(assert_state, value=False)
|
||||
|
@ -94,6 +103,281 @@ def test_simple_context(error_parent):
|
|||
trio.run(main)
|
||||
|
||||
|
||||
# basic stream terminations:
|
||||
# - callee context closes without using stream
|
||||
# - caller context closes without using stream
|
||||
# - caller context calls `Context.cancel()` while streaming
|
||||
# is ongoing resulting in callee being cancelled
|
||||
# - callee calls `Context.cancel()` while streaming and caller
|
||||
# sees stream terminated in `RemoteActorError`
|
||||
|
||||
# TODO: future possible features
|
||||
# - restart request: far end raises `ContextRestart`
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def close_ctx_immediately(
|
||||
|
||||
ctx: tractor.Context,
|
||||
|
||||
) -> None:
|
||||
|
||||
await ctx.started()
|
||||
global _state
|
||||
|
||||
async with ctx.open_stream():
|
||||
pass
|
||||
|
||||
|
||||
@tractor_test
|
||||
async def test_callee_closes_ctx_after_stream_open():
|
||||
'callee context closes without using stream'
|
||||
|
||||
async with tractor.open_nursery() as n:
|
||||
|
||||
portal = await n.start_actor(
|
||||
'fast_stream_closer',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
async with portal.open_context(
|
||||
close_ctx_immediately,
|
||||
|
||||
# flag to avoid waiting the final result
|
||||
# cancel_on_exit=True,
|
||||
|
||||
) as (ctx, sent):
|
||||
|
||||
assert sent is None
|
||||
|
||||
with trio.fail_after(0.5):
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
# should fall through since ``StopAsyncIteration``
|
||||
# should be raised through translation of
|
||||
# a ``trio.EndOfChannel`` by
|
||||
# ``trio.abc.ReceiveChannel.__anext__()``
|
||||
async for _ in stream:
|
||||
assert 0
|
||||
else:
|
||||
|
||||
# verify stream is now closed
|
||||
try:
|
||||
await stream.receive()
|
||||
except trio.EndOfChannel:
|
||||
pass
|
||||
|
||||
# TODO: should be just raise the closed resource err
|
||||
# directly here to enforce not allowing a re-open
|
||||
# of a stream to the context (at least until a time of
|
||||
# if/when we decide that's a good idea?)
|
||||
try:
|
||||
async with ctx.open_stream() as stream:
|
||||
pass
|
||||
except trio.ClosedResourceError:
|
||||
pass
|
||||
|
||||
await portal.cancel_actor()
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def expect_cancelled(
|
||||
|
||||
ctx: tractor.Context,
|
||||
|
||||
) -> None:
|
||||
global _state
|
||||
_state = True
|
||||
|
||||
await ctx.started()
|
||||
|
||||
try:
|
||||
async with ctx.open_stream() as stream:
|
||||
async for msg in stream:
|
||||
await stream.send(msg) # echo server
|
||||
|
||||
except trio.Cancelled:
|
||||
# expected case
|
||||
_state = False
|
||||
raise
|
||||
|
||||
else:
|
||||
assert 0, "Wasn't cancelled!?"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'use_ctx_cancel_method',
|
||||
[False, True],
|
||||
)
|
||||
@tractor_test
|
||||
async def test_caller_closes_ctx_after_callee_opens_stream(
|
||||
use_ctx_cancel_method: bool,
|
||||
):
|
||||
'caller context closes without using stream'
|
||||
|
||||
async with tractor.open_nursery() as n:
|
||||
|
||||
portal = await n.start_actor(
|
||||
'ctx_cancelled',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
async with portal.open_context(
|
||||
expect_cancelled,
|
||||
) as (ctx, sent):
|
||||
await portal.run(assert_state, value=True)
|
||||
|
||||
assert sent is None
|
||||
|
||||
# call cancel explicitly
|
||||
if use_ctx_cancel_method:
|
||||
|
||||
await ctx.cancel()
|
||||
|
||||
try:
|
||||
async with ctx.open_stream() as stream:
|
||||
async for msg in stream:
|
||||
pass
|
||||
|
||||
except tractor.ContextCancelled:
|
||||
raise # XXX: must be propagated to __aexit__
|
||||
|
||||
else:
|
||||
assert 0, "Should have context cancelled?"
|
||||
|
||||
# channel should still be up
|
||||
assert portal.channel.connected()
|
||||
|
||||
# ctx is closed here
|
||||
await portal.run(assert_state, value=False)
|
||||
|
||||
else:
|
||||
try:
|
||||
with trio.fail_after(0.2):
|
||||
await ctx.result()
|
||||
assert 0, "Callee should have blocked!?"
|
||||
except trio.TooSlowError:
|
||||
await ctx.cancel()
|
||||
try:
|
||||
async with ctx.open_stream() as stream:
|
||||
async for msg in stream:
|
||||
pass
|
||||
except trio.ClosedResourceError:
|
||||
pass
|
||||
else:
|
||||
assert 0, "Should have received closed resource error?"
|
||||
|
||||
# ctx is closed here
|
||||
await portal.run(assert_state, value=False)
|
||||
|
||||
# channel should not have been destroyed yet, only the
|
||||
# inter-actor-task context
|
||||
assert portal.channel.connected()
|
||||
|
||||
# teardown the actor
|
||||
await portal.cancel_actor()
|
||||
|
||||
|
||||
@tractor_test
|
||||
async def test_multitask_caller_cancels_from_nonroot_task():
|
||||
|
||||
async with tractor.open_nursery() as n:
|
||||
|
||||
portal = await n.start_actor(
|
||||
'ctx_cancelled',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
async with portal.open_context(
|
||||
expect_cancelled,
|
||||
) as (ctx, sent):
|
||||
|
||||
await portal.run(assert_state, value=True)
|
||||
assert sent is None
|
||||
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
async def send_msg_then_cancel():
|
||||
await stream.send('yo')
|
||||
await portal.run(assert_state, value=True)
|
||||
await ctx.cancel()
|
||||
await portal.run(assert_state, value=False)
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
n.start_soon(send_msg_then_cancel)
|
||||
|
||||
try:
|
||||
async for msg in stream:
|
||||
assert msg == 'yo'
|
||||
|
||||
except tractor.ContextCancelled:
|
||||
raise # XXX: must be propagated to __aexit__
|
||||
|
||||
# channel should still be up
|
||||
assert portal.channel.connected()
|
||||
|
||||
# ctx is closed here
|
||||
await portal.run(assert_state, value=False)
|
||||
|
||||
# channel should not have been destroyed yet, only the
|
||||
# inter-actor-task context
|
||||
assert portal.channel.connected()
|
||||
|
||||
# teardown the actor
|
||||
await portal.cancel_actor()
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def cancel_self(
|
||||
|
||||
ctx: tractor.Context,
|
||||
|
||||
) -> None:
|
||||
global _state
|
||||
_state = True
|
||||
|
||||
await ctx.cancel()
|
||||
try:
|
||||
with trio.fail_after(0.1):
|
||||
await trio.sleep_forever()
|
||||
|
||||
except trio.Cancelled:
|
||||
raise
|
||||
|
||||
except trio.TooSlowError:
|
||||
# should never get here
|
||||
assert 0
|
||||
|
||||
|
||||
@tractor_test
|
||||
async def test_callee_cancels_before_started():
|
||||
'''callee calls `Context.cancel()` while streaming and caller
|
||||
sees stream terminated in `ContextCancelled`.
|
||||
|
||||
'''
|
||||
async with tractor.open_nursery() as n:
|
||||
|
||||
portal = await n.start_actor(
|
||||
'cancels_self',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
try:
|
||||
|
||||
async with portal.open_context(
|
||||
cancel_self,
|
||||
) as (ctx, sent):
|
||||
async with ctx.open_stream():
|
||||
|
||||
await trio.sleep_forever()
|
||||
|
||||
# raises a special cancel signal
|
||||
except tractor.ContextCancelled as ce:
|
||||
ce.type == trio.Cancelled
|
||||
|
||||
# teardown the actor
|
||||
await portal.cancel_actor()
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def simple_rpc(
|
||||
|
||||
|
@ -207,6 +491,8 @@ def test_simple_rpc(server_func, use_async_for):
|
|||
|
||||
# stream should terminate here
|
||||
|
||||
# final context result(s) should be consumed here in __aexit__()
|
||||
|
||||
await portal.cancel_actor()
|
||||
|
||||
trio.run(main)
|
||||
|
|
Loading…
Reference in New Issue