Ensure kbi will cancel context block

Follow up to previous commit: extend our simple context test set to
include cancellation via kbi in the parent as well as timeout logic and
testing of the parent opening a stream even though the target actor does
not.

Thanks again to https://github.com/adder46/wrath for discovering this
bug.
fix_kbi_in_ctx_block
Tyler Goodlet 2021-10-04 08:59:24 -04:00
parent c1727ce05e
commit 8d79d83ac2
1 changed files with 51 additions and 24 deletions

View File

@ -51,53 +51,80 @@ async def assert_state(value: bool):
@pytest.mark.parametrize(
'error_parent',
[False, True],
[False, ValueError, KeyboardInterrupt],
)
@pytest.mark.parametrize(
'callee_blocks_forever',
[False, True],
ids=lambda item: f'callee_blocks_forever={item}'
)
@pytest.mark.parametrize(
'pointlessly_open_stream',
[False, True],
ids=lambda item: f'open_stream={item}'
)
def test_simple_context(
error_parent,
callee_blocks_forever,
pointlessly_open_stream,
):
async def main():
async with tractor.open_nursery() as n:
with trio.fail_after(1.5):
async with tractor.open_nursery() as nursery:
portal = await n.start_actor(
'simple_context',
enable_modules=[__name__],
)
portal = await nursery.start_actor(
'simple_context',
enable_modules=[__name__],
)
async with portal.open_context(
simple_setup_teardown,
data=10,
block_forever=callee_blocks_forever,
) as (ctx, sent):
try:
async with portal.open_context(
simple_setup_teardown,
data=10,
block_forever=callee_blocks_forever,
) as (ctx, sent):
assert sent == 11
assert sent == 11
if callee_blocks_forever:
await portal.run(assert_state, value=True)
await ctx.cancel()
else:
assert await ctx.result() == 'yo'
if callee_blocks_forever:
await portal.run(assert_state, value=True)
else:
assert await ctx.result() == 'yo'
# after cancellation
await portal.run(assert_state, value=False)
if not error_parent:
await ctx.cancel()
if error_parent:
raise ValueError
if pointlessly_open_stream:
async with ctx.open_stream():
if error_parent:
raise error_parent
# shut down daemon
await portal.cancel_actor()
if callee_blocks_forever:
await ctx.cancel()
else:
# in this case the stream will send a
# 'stop' msg to the far end which needs
# to be ignored
pass
else:
if error_parent:
raise error_parent
finally:
# after cancellation
if not error_parent:
await portal.run(assert_state, value=False)
# shut down daemon
await portal.cancel_actor()
if error_parent:
try:
trio.run(main)
except ValueError:
except error_parent:
pass
else:
trio.run(main)