Add new set of context cancellation tests
These will verify new changes to the runtime/messaging core which allows us to adopt an "ignore cancel if requested by us" style handling of `ContextCancelled` more like how `trio` does with `trio.Nursery.cancel_scope.cancel()`. We now expect a `ContextCancelled.canceller: tuple` which is set to the actor uid of the actor which requested the cancellation which eventually resulted in the remote error-msg. Also adds some experimental tweaks to the "backpressure" test which it turns out is very problematic in coordination with context cancellation since blocking on the feed mem chan to some task will block the ipc msg loop and thus handling of cancellation.. More to come to both the test and core to address this hopefully since right now this test is failing.proper_breakpoint_hooking
parent
8913829511
commit
7dd5d8d1f8
|
@ -13,7 +13,10 @@ from typing import Optional
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor._exceptions import StreamOverrun
|
from tractor._exceptions import (
|
||||||
|
StreamOverrun,
|
||||||
|
ContextCancelled,
|
||||||
|
)
|
||||||
|
|
||||||
from conftest import tractor_test
|
from conftest import tractor_test
|
||||||
|
|
||||||
|
@ -91,7 +94,10 @@ async def not_started_but_stream_opened(
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'target',
|
'target',
|
||||||
[too_many_starteds, not_started_but_stream_opened],
|
[
|
||||||
|
too_many_starteds,
|
||||||
|
not_started_but_stream_opened,
|
||||||
|
],
|
||||||
ids='misuse_type={}'.format,
|
ids='misuse_type={}'.format,
|
||||||
)
|
)
|
||||||
def test_started_misuse(target):
|
def test_started_misuse(target):
|
||||||
|
@ -228,6 +234,70 @@ def test_simple_context(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'cancel_method',
|
||||||
|
['ctx', 'portal'],
|
||||||
|
ids=lambda item: f'cancel_method={item}'
|
||||||
|
)
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'result_before_exit',
|
||||||
|
[True, False],
|
||||||
|
ids=lambda item: f'result_before_exit={item}'
|
||||||
|
)
|
||||||
|
def test_caller_cancels(
|
||||||
|
cancel_method: str,
|
||||||
|
result_before_exit: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify that when the opening side of a context (aka the caller)
|
||||||
|
cancels that context, the ctx does not raise a cancelled when
|
||||||
|
either calling `.result()` or on context exit.
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
async def check_canceller(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
) -> None:
|
||||||
|
# should not raise yet return the remote
|
||||||
|
# context cancelled error.
|
||||||
|
err = await ctx.result()
|
||||||
|
assert isinstance(err, ContextCancelled)
|
||||||
|
assert (
|
||||||
|
tuple(err.canceller)
|
||||||
|
==
|
||||||
|
tractor.current_actor().uid
|
||||||
|
)
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery() as nursery:
|
||||||
|
portal = await nursery.start_actor(
|
||||||
|
'simple_context',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
with trio.fail_after(0.5):
|
||||||
|
async with portal.open_context(
|
||||||
|
simple_setup_teardown,
|
||||||
|
data=10,
|
||||||
|
block_forever=True,
|
||||||
|
) as (ctx, sent):
|
||||||
|
|
||||||
|
if cancel_method == 'ctx':
|
||||||
|
await ctx.cancel()
|
||||||
|
else:
|
||||||
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
if result_before_exit:
|
||||||
|
await check_canceller(ctx)
|
||||||
|
|
||||||
|
if not result_before_exit:
|
||||||
|
await check_canceller(ctx)
|
||||||
|
|
||||||
|
if cancel_method != 'portal':
|
||||||
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
# basic stream terminations:
|
# basic stream terminations:
|
||||||
# - callee context closes without using stream
|
# - callee context closes without using stream
|
||||||
# - caller context closes without using stream
|
# - caller context closes without using stream
|
||||||
|
@ -506,7 +576,6 @@ async def test_callee_cancels_before_started():
|
||||||
cancel_self,
|
cancel_self,
|
||||||
) as (ctx, sent):
|
) as (ctx, sent):
|
||||||
async with ctx.open_stream():
|
async with ctx.open_stream():
|
||||||
|
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
# raises a special cancel signal
|
# raises a special cancel signal
|
||||||
|
@ -610,7 +679,7 @@ def test_one_end_stream_not_opened(overrun_by):
|
||||||
|
|
||||||
# 2 overrun cases and the no overrun case (which pushes right up to
|
# 2 overrun cases and the no overrun case (which pushes right up to
|
||||||
# the msg limit)
|
# the msg limit)
|
||||||
if overrunner == 'caller' or 'cance' in overrunner:
|
if overrunner == 'caller' or 'cancel' in overrunner:
|
||||||
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
@ -634,7 +703,7 @@ async def echo_back_sequence(
|
||||||
|
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
seq: list[int],
|
seq: list[int],
|
||||||
msg_buffer_size: Optional[int] = None,
|
msg_buffer_size: int | None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -644,11 +713,13 @@ async def echo_back_sequence(
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
async with ctx.open_stream(
|
async with ctx.open_stream(
|
||||||
msg_buffer_size=msg_buffer_size,
|
msg_buffer_size=msg_buffer_size,
|
||||||
|
backpressure=True,
|
||||||
) as stream:
|
) as stream:
|
||||||
|
|
||||||
seq = list(seq) # bleh, `msgpack`...
|
seq = list(seq) # bleh, `msgpack`...
|
||||||
count = 0
|
count = 0
|
||||||
while count < 3:
|
# while count < 10:
|
||||||
|
while True:
|
||||||
batch = []
|
batch = []
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
batch.append(msg)
|
batch.append(msg)
|
||||||
|
@ -661,13 +732,17 @@ async def echo_back_sequence(
|
||||||
|
|
||||||
count += 1
|
count += 1
|
||||||
|
|
||||||
|
print("EXITING CALLEEE")
|
||||||
return 'yo'
|
return 'yo'
|
||||||
|
|
||||||
|
|
||||||
def test_stream_backpressure():
|
def test_stream_backpressure(
|
||||||
|
loglevel: str,
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Demonstrate small overruns of each task back and forth
|
Demonstrate small overruns of each task back and forth
|
||||||
on a stream not raising any errors by default.
|
on a stream not raising any errors by default by setting
|
||||||
|
the ``backpressure=True``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
@ -675,16 +750,21 @@ def test_stream_backpressure():
|
||||||
portal = await n.start_actor(
|
portal = await n.start_actor(
|
||||||
'callee_sends_forever',
|
'callee_sends_forever',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
|
loglevel=loglevel,
|
||||||
)
|
)
|
||||||
seq = list(range(3))
|
seq = list(range(3))
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
echo_back_sequence,
|
echo_back_sequence,
|
||||||
seq=seq,
|
seq=seq,
|
||||||
msg_buffer_size=1,
|
|
||||||
) as (ctx, sent):
|
) as (ctx, sent):
|
||||||
|
|
||||||
assert sent is None
|
assert sent is None
|
||||||
|
|
||||||
async with ctx.open_stream(msg_buffer_size=1) as stream:
|
async with ctx.open_stream(
|
||||||
|
msg_buffer_size=1,
|
||||||
|
backpressure=True,
|
||||||
|
# allow_overruns=True,
|
||||||
|
) as stream:
|
||||||
count = 0
|
count = 0
|
||||||
while count < 3:
|
while count < 3:
|
||||||
for msg in seq:
|
for msg in seq:
|
||||||
|
@ -693,15 +773,25 @@ def test_stream_backpressure():
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
batch = []
|
batch = []
|
||||||
|
# with trio.move_on_after(1) as cs:
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
|
print(f'RX {msg}')
|
||||||
batch.append(msg)
|
batch.append(msg)
|
||||||
if batch == seq:
|
if batch == seq:
|
||||||
break
|
break
|
||||||
|
|
||||||
count += 1
|
count += 1
|
||||||
|
|
||||||
|
# if cs.cancelled_caught:
|
||||||
|
# break
|
||||||
|
|
||||||
|
# cancel the remote task
|
||||||
|
# print('SENDING ROOT SIDE CANCEL')
|
||||||
|
# await ctx.cancel()
|
||||||
|
|
||||||
# here the context should return
|
# here the context should return
|
||||||
assert await ctx.result() == 'yo'
|
res = await ctx.result()
|
||||||
|
assert res == 'yo'
|
||||||
|
|
||||||
# cancel the daemon
|
# cancel the daemon
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
@ -737,18 +827,18 @@ async def attach_to_sleep_forever():
|
||||||
finally:
|
finally:
|
||||||
# XXX: previously this would trigger local
|
# XXX: previously this would trigger local
|
||||||
# ``ContextCancelled`` to be received and raised in the
|
# ``ContextCancelled`` to be received and raised in the
|
||||||
# local context overriding any local error due to
|
# local context overriding any local error due to logic
|
||||||
# logic inside ``_invoke()`` which checked for
|
# inside ``_invoke()`` which checked for an error set on
|
||||||
# an error set on ``Context._error`` and raised it in
|
# ``Context._error`` and raised it in a cancellation
|
||||||
# under a cancellation scenario.
|
# scenario.
|
||||||
|
# ------
|
||||||
# The problem is you can have a remote cancellation
|
# The problem is you can have a remote cancellation that
|
||||||
# that is part of a local error and we shouldn't raise
|
# is part of a local error and we shouldn't raise
|
||||||
# ``ContextCancelled`` **iff** we weren't the side of
|
# ``ContextCancelled`` **iff** we **were not** the side
|
||||||
# the context to initiate it, i.e.
|
# of the context to initiate it, i.e.
|
||||||
# ``Context._cancel_called`` should **NOT** have been
|
# ``Context._cancel_called`` should **NOT** have been
|
||||||
# set. The special logic to handle this case is now
|
# set. The special logic to handle this case is now
|
||||||
# inside ``Context._may_raise_from_remote_msg()`` XD
|
# inside ``Context._maybe_raise_from_remote_msg()`` XD
|
||||||
await peer_ctx.cancel()
|
await peer_ctx.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
@ -769,9 +859,10 @@ async def error_before_started(
|
||||||
|
|
||||||
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
||||||
'''
|
'''
|
||||||
Verify that an error raised in a remote context which itself opens another
|
Verify that an error raised in a remote context which itself opens
|
||||||
remote context, which it cancels, does not ovverride the original error that
|
another remote context, which it cancels, does not ovverride the
|
||||||
caused the cancellation of the secondardy context.
|
original error that caused the cancellation of the secondardy
|
||||||
|
context.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
|
Loading…
Reference in New Issue