forked from goodboy/tractor
Adjust test details where `Context.cancel()` is called
We can now make asserts on `.cancelled_caught` and `_remote_error` vs. `_local_error`. Expect a runtime error when `Context.open_stream()` is called AFTER `.cancel()` and the remote `ContextCancelled` hasn't arrived (yet). Adjust to `'itself'` string in self-cancel case.multihomed
parent
b77d123edd
commit
ecb525a2bc
|
@ -13,6 +13,11 @@ from typing import Optional
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
from tractor import (
|
||||||
|
Actor,
|
||||||
|
Context,
|
||||||
|
current_actor,
|
||||||
|
)
|
||||||
from tractor._exceptions import (
|
from tractor._exceptions import (
|
||||||
StreamOverrun,
|
StreamOverrun,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
@ -193,9 +198,6 @@ def test_simple_context(
|
||||||
else:
|
else:
|
||||||
assert await ctx.result() == 'yo'
|
assert await ctx.result() == 'yo'
|
||||||
|
|
||||||
if not error_parent:
|
|
||||||
await ctx.cancel()
|
|
||||||
|
|
||||||
if pointlessly_open_stream:
|
if pointlessly_open_stream:
|
||||||
async with ctx.open_stream():
|
async with ctx.open_stream():
|
||||||
if error_parent:
|
if error_parent:
|
||||||
|
@ -208,10 +210,15 @@ def test_simple_context(
|
||||||
# 'stop' msg to the far end which needs
|
# 'stop' msg to the far end which needs
|
||||||
# to be ignored
|
# to be ignored
|
||||||
pass
|
pass
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if error_parent:
|
if error_parent:
|
||||||
raise error_parent
|
raise error_parent
|
||||||
|
|
||||||
|
# cancel AFTER we open a stream
|
||||||
|
# to avoid a cancel raised inside
|
||||||
|
# `.open_stream()`
|
||||||
|
await ctx.cancel()
|
||||||
finally:
|
finally:
|
||||||
|
|
||||||
# after cancellation
|
# after cancellation
|
||||||
|
@ -276,7 +283,7 @@ def test_caller_cancels(
|
||||||
assert (
|
assert (
|
||||||
tuple(err.canceller)
|
tuple(err.canceller)
|
||||||
==
|
==
|
||||||
tractor.current_actor().uid
|
current_actor().uid
|
||||||
)
|
)
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
@ -430,9 +437,11 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
||||||
):
|
):
|
||||||
'caller context closes without using stream'
|
'caller context closes without using stream'
|
||||||
|
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as an:
|
||||||
|
|
||||||
portal = await n.start_actor(
|
root: Actor = current_actor()
|
||||||
|
|
||||||
|
portal = await an.start_actor(
|
||||||
'ctx_cancelled',
|
'ctx_cancelled',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
@ -440,10 +449,10 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
expect_cancelled,
|
expect_cancelled,
|
||||||
) as (ctx, sent):
|
) as (ctx, sent):
|
||||||
await portal.run(assert_state, value=True)
|
|
||||||
|
|
||||||
assert sent is None
|
assert sent is None
|
||||||
|
|
||||||
|
await portal.run(assert_state, value=True)
|
||||||
|
|
||||||
# call cancel explicitly
|
# call cancel explicitly
|
||||||
if use_ctx_cancel_method:
|
if use_ctx_cancel_method:
|
||||||
|
|
||||||
|
@ -454,8 +463,21 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
except tractor.ContextCancelled:
|
except tractor.ContextCancelled as ctxc:
|
||||||
raise # XXX: must be propagated to __aexit__
|
# XXX: the cause is US since we call
|
||||||
|
# `Context.cancel()` just above!
|
||||||
|
assert (
|
||||||
|
ctxc.canceller
|
||||||
|
==
|
||||||
|
current_actor().uid
|
||||||
|
==
|
||||||
|
root.uid
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX: must be propagated to __aexit__
|
||||||
|
# and should be silently absorbed there
|
||||||
|
# since we called `.cancel()` just above ;)
|
||||||
|
raise
|
||||||
|
|
||||||
else:
|
else:
|
||||||
assert 0, "Should have context cancelled?"
|
assert 0, "Should have context cancelled?"
|
||||||
|
@ -472,7 +494,13 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
||||||
await ctx.result()
|
await ctx.result()
|
||||||
assert 0, "Callee should have blocked!?"
|
assert 0, "Callee should have blocked!?"
|
||||||
except trio.TooSlowError:
|
except trio.TooSlowError:
|
||||||
|
# NO-OP -> since already called above
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
|
# local scope should have absorbed the cancellation
|
||||||
|
assert ctx.cancelled_caught
|
||||||
|
assert ctx._remote_error is ctx._local_error
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
|
@ -551,19 +579,25 @@ async def cancel_self(
|
||||||
global _state
|
global _state
|
||||||
_state = True
|
_state = True
|
||||||
|
|
||||||
|
# since we call this the below `.open_stream()` should always
|
||||||
|
# error!
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
# should inline raise immediately
|
# should inline raise immediately
|
||||||
try:
|
try:
|
||||||
async with ctx.open_stream():
|
async with ctx.open_stream():
|
||||||
pass
|
pass
|
||||||
except tractor.ContextCancelled:
|
# except tractor.ContextCancelled:
|
||||||
|
except RuntimeError:
|
||||||
# suppress for now so we can do checkpoint tests below
|
# suppress for now so we can do checkpoint tests below
|
||||||
pass
|
print('Got expected runtime error for stream-after-cancel')
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise RuntimeError('Context didnt cancel itself?!')
|
raise RuntimeError('Context didnt cancel itself?!')
|
||||||
|
|
||||||
# check a real ``trio.Cancelled`` is raised on a checkpoint
|
# check that``trio.Cancelled`` is now raised on any further
|
||||||
|
# checkpoints since the self cancel above will have cancelled
|
||||||
|
# the `Context._scope.cancel_scope: trio.CancelScope`
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(0.1):
|
with trio.fail_after(0.1):
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
@ -574,6 +608,7 @@ async def cancel_self(
|
||||||
# should never get here
|
# should never get here
|
||||||
assert 0
|
assert 0
|
||||||
|
|
||||||
|
raise RuntimeError('Context didnt cancel itself?!')
|
||||||
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_callee_cancels_before_started():
|
async def test_callee_cancels_before_started():
|
||||||
|
@ -601,7 +636,7 @@ async def test_callee_cancels_before_started():
|
||||||
ce.type == trio.Cancelled
|
ce.type == trio.Cancelled
|
||||||
|
|
||||||
# the traceback should be informative
|
# the traceback should be informative
|
||||||
assert 'cancelled itself' in ce.msgdata['tb_str']
|
assert 'itself' in ce.msgdata['tb_str']
|
||||||
|
|
||||||
# teardown the actor
|
# teardown the actor
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
@ -773,7 +808,7 @@ async def echo_back_sequence(
|
||||||
|
|
||||||
print(
|
print(
|
||||||
'EXITING CALLEEE:\n'
|
'EXITING CALLEEE:\n'
|
||||||
f'{ctx.cancel_called_remote}'
|
f'{ctx.canceller}'
|
||||||
)
|
)
|
||||||
return 'yo'
|
return 'yo'
|
||||||
|
|
||||||
|
@ -871,7 +906,7 @@ def test_maybe_allow_overruns_stream(
|
||||||
|
|
||||||
if cancel_ctx:
|
if cancel_ctx:
|
||||||
assert isinstance(res, ContextCancelled)
|
assert isinstance(res, ContextCancelled)
|
||||||
assert tuple(res.canceller) == tractor.current_actor().uid
|
assert tuple(res.canceller) == current_actor().uid
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print(f'RX ROOT SIDE RESULT {res}')
|
print(f'RX ROOT SIDE RESULT {res}')
|
||||||
|
|
Loading…
Reference in New Issue