tractor/tests/test_cancellation.py

622 lines
20 KiB
Python
Raw Permalink Normal View History

"""
Cancellation and error propagation
"""
import os
import signal
import platform
import time
from itertools import repeat
import pytest
import trio
import tractor
Add (back) a `tractor._testing` sub-pkg Since importing from our top level `conftest.py` is not scaleable or as "future forward thinking" in terms of: - LoC-wise (it's only one file), - prevents "external" (aka non-test) example scripts from importing content easily, - seemingly(?) can't be used via abs-import if using a `[tool.pytest.ini_options]` in a `pyproject.toml` vs. a `pytest.ini`, see: https://docs.pytest.org/en/8.0.x/reference/customize.html#pyproject-toml) => Go back to having an internal "testing" pkg like `trio` (kinda) does. Deats: - move generic top level helpers into pkg-mod including the new `expect_ctxc()` (which i needed in the advanced faults testing script. - move `@tractor_test` into `._testing.pytest` sub-mod. - adjust all the helper imports to be a `from tractor._testing import <..>` Rework `test_ipc_channel_break_during_stream()` and backing script: - make test(s) pull `debug_mode` from new fixture (which is now controlled manually from `--tpdb` flag) and drop the previous parametrized input. - update logic in ^ test for "which-side-fails" cases to better match recently updated/stricter cancel/failure semantics in terms of `ClosedResouruceError` vs. `EndOfChannel` expectations. - handle `ExceptionGroup`s with expected embedded errors in test. - better pendantics around whether to expect a user simulated KBI. - for `examples/advanced_faults/ipc_failure_during_stream.py` script: - generalize ipc breakage in new `break_ipc()` with support for diff internal `trio` methods and a #TODO for future disti frameworks - only make one sub-actor task break and the other just stream. - use new `._testing.expect_ctxc()` around ctx block. - add a bit of exception handling with `print()`s around ctxc (unused except if 'msg' break method is set) and eoc cases. - don't break parent side ipc in loop any more then once after first break, checked via flag var. - add a `pre_close: bool` flag to control whether `MsgStreama.aclose()` is called *before* any ipc breakage method. Still TODO: - drop `pytest.ini` and add the alt section to `pyproject.py`. -> currently can't get `--rootdir=` opt to work.. not showing in console header. -> ^ also breaks on 'tests' `enable_modules` imports in subactors during discovery tests?
2024-03-12 19:48:20 +00:00
from tractor._testing import (
tractor_test,
)
from conftest import no_windows
2022-01-20 13:26:30 +00:00
def is_win():
return platform.system() == 'Windows'
async def assert_err(delay=0):
await trio.sleep(delay)
assert 0
async def sleep_forever():
await trio.sleep_forever()
async def do_nuthin():
# just nick the scheduler
await trio.sleep(0)
2018-11-19 19:16:42 +00:00
@pytest.mark.parametrize(
'args_err',
[
# expected to be thrown in assert_err
({}, AssertionError),
# argument mismatch raised in _invoke()
({'unexpected': 10}, TypeError)
],
ids=['no_args', 'unexpected_args'],
)
def test_remote_error(reg_addr, args_err):
'''
Verify an error raised in a subactor that is propagated
to the parent nursery, contains the underlying boxed builtin
error type info and causes cancellation and reraising all the
way up the stack.
'''
2018-11-19 19:16:42 +00:00
args, errtype = args_err
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
) as nursery:
# on a remote type error caused by bad input args
# this should raise directly which means we **don't** get
# an exception group outside the nursery since the error
# here and the far end task error are one in the same?
2021-04-28 15:55:37 +00:00
portal = await nursery.run_in_actor(
assert_err,
name='errorer',
**args
2021-04-28 15:55:37 +00:00
)
# get result(s) from main task
try:
# this means the root actor will also raise a local
# parent task error and thus an eg will propagate out
# of this actor nursery.
2018-11-19 19:16:42 +00:00
await portal.result()
except tractor.RemoteActorError as err:
assert err.boxed_type == errtype
print("Look Maa that actor failed hard, hehh")
raise
# ensure boxed errors
if args:
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
assert excinfo.value.boxed_type == errtype
else:
# the root task will also error on the `Portal.result()`
# call so we expect an error from there AND the child.
# |_ tho seems like on new `trio` this doesn't always
# happen?
with pytest.raises((
BaseExceptionGroup,
tractor.RemoteActorError,
)) as excinfo:
trio.run(main)
# ensure boxed errors are `errtype`
err: BaseException = excinfo.value
if isinstance(err, BaseExceptionGroup):
suberrs: list[BaseException] = err.exceptions
else:
suberrs: list[BaseException] = [err]
for exc in suberrs:
assert exc.boxed_type == errtype
def test_multierror(
reg_addr: tuple[str, int],
):
'''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
2018-11-19 19:16:42 +00:00
more then one actor errors.
'''
2018-11-19 19:16:42 +00:00
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
) as nursery:
2018-11-19 19:16:42 +00:00
await nursery.run_in_actor(assert_err, name='errorer1')
portal2 = await nursery.run_in_actor(assert_err, name='errorer2')
2018-11-19 19:16:42 +00:00
# get result(s) from main task
try:
await portal2.result()
except tractor.RemoteActorError as err:
assert err.boxed_type == AssertionError
2018-11-19 19:16:42 +00:00
print("Look Maa that first actor failed hard, hehh")
raise
# here we should get a ``BaseExceptionGroup`` containing exceptions
2018-11-19 19:16:42 +00:00
# from both subactors
with pytest.raises(BaseExceptionGroup):
trio.run(main)
2018-11-19 19:16:42 +00:00
@pytest.mark.parametrize('delay', (0, 0.5))
@pytest.mark.parametrize(
'num_subactors', range(25, 26),
)
def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
"""Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors and also with a delay before failure
to test failure during an ongoing spawning.
"""
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
) as nursery:
for i in range(num_subactors):
await nursery.run_in_actor(
assert_err,
name=f'errorer{i}',
delay=delay
)
# with pytest.raises(trio.MultiError) as exc_info:
with pytest.raises(BaseExceptionGroup) as exc_info:
trio.run(main)
assert exc_info.type == ExceptionGroup
err = exc_info.value
exceptions = err.exceptions
if len(exceptions) == 2:
# sometimes oddly now there's an embedded BrokenResourceError ?
for exc in exceptions:
excs = getattr(exc, 'exceptions', None)
if excs:
exceptions = excs
break
assert len(exceptions) == num_subactors
for exc in exceptions:
assert isinstance(exc, tractor.RemoteActorError)
assert exc.boxed_type == AssertionError
2021-04-27 13:14:08 +00:00
async def do_nothing():
pass
@pytest.mark.parametrize('mechanism', ['nursery_cancel', KeyboardInterrupt])
def test_cancel_single_subactor(reg_addr, mechanism):
'''
Ensure a ``ActorNursery.start_actor()`` spawned subactor
2018-11-19 19:16:42 +00:00
cancels when the nursery is cancelled.
'''
2018-11-19 19:16:42 +00:00
async def spawn_actor():
'''
Spawn an actor that blocks indefinitely then cancel via
either `ActorNursery.cancel()` or an exception raise.
'''
async with tractor.open_nursery(
registry_addrs=[reg_addr],
) as nursery:
portal = await nursery.start_actor(
'nothin', enable_modules=[__name__],
)
2020-12-22 15:35:05 +00:00
assert (await portal.run(do_nothing)) is None
if mechanism == 'nursery_cancel':
# would hang otherwise
await nursery.cancel()
else:
raise mechanism
if mechanism == 'nursery_cancel':
trio.run(spawn_actor)
else:
with pytest.raises(mechanism):
trio.run(spawn_actor)
async def stream_forever():
for i in repeat("I can see these little future bubble things"):
# each yielded value is sent over the ``Channel`` to the
# parent actor
yield i
await trio.sleep(0.01)
@tractor_test
2019-03-09 01:06:16 +00:00
async def test_cancel_infinite_streamer(start_method):
# stream for at most 1 seconds
with trio.move_on_after(1) as cancel_scope:
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'donny',
2021-04-28 15:55:37 +00:00
enable_modules=[__name__],
)
# this async for loop streams values from the above
# async generator running in a separate process
2021-04-28 15:55:37 +00:00
async with portal.open_stream_from(stream_forever) as stream:
async for letter in stream:
print(letter)
# we support trio's cancellation system
assert cancel_scope.cancelled_caught
assert n.cancelled
2018-11-19 19:16:42 +00:00
@pytest.mark.parametrize(
'num_actors_and_errs',
[
# daemon actors sit idle while single task actors error out
(1, tractor.RemoteActorError, AssertionError, (assert_err, {}), None),
(2, BaseExceptionGroup, AssertionError, (assert_err, {}), None),
(3, BaseExceptionGroup, AssertionError, (assert_err, {}), None),
# 1 daemon actor errors out while single task actors sleep forever
(3, tractor.RemoteActorError, AssertionError, (sleep_forever, {}),
(assert_err, {}, True)),
# daemon actors error out after brief delay while single task
# actors complete quickly
(3, tractor.RemoteActorError, AssertionError,
(do_nuthin, {}), (assert_err, {'delay': 1}, True)),
# daemon complete quickly delay while single task
# actors error after brief delay
(3, BaseExceptionGroup, AssertionError,
(assert_err, {'delay': 1}), (do_nuthin, {}, False)),
],
ids=[
'1_run_in_actor_fails',
'2_run_in_actors_fail',
'3_run_in_actors_fail',
'1_daemon_actors_fail',
'1_daemon_actors_fail_all_run_in_actors_dun_quick',
'no_daemon_actors_fail_all_run_in_actors_sleep_then_fail',
2018-11-19 19:16:42 +00:00
],
)
@tractor_test
2020-07-20 23:51:07 +00:00
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
"""Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio.
This is the first and only supervisory strategy at the moment.
"""
num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
try:
async with tractor.open_nursery() as n:
# spawn the same number of deamon actors which should be cancelled
dactor_portals = []
for i in range(num_actors):
dactor_portals.append(await n.start_actor(
f'deamon_{i}',
enable_modules=[__name__],
))
func, kwargs = ria_func
riactor_portals = []
for i in range(num_actors):
# start actor(s) that will fail immediately
riactor_portals.append(
await n.run_in_actor(
func,
name=f'actor_{i}',
**kwargs
)
)
if da_func:
func, kwargs, expect_error = da_func
for portal in dactor_portals:
# if this function fails then we should error here
# and the nursery should teardown all other actors
try:
2020-12-22 15:35:05 +00:00
await portal.run(func, **kwargs)
except tractor.RemoteActorError as err:
assert err.boxed_type == err_type
# we only expect this first error to propogate
# (all other daemons are cancelled before they
# can be scheduled)
num_actors = 1
# reraise so nursery teardown is triggered
raise
else:
if expect_error:
pytest.fail(
"Deamon call should fail at checkpoint?")
2018-11-19 19:16:42 +00:00
# should error here with a ``RemoteActorError`` or ``MultiError``
2018-11-19 19:16:42 +00:00
except first_err as err:
if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors
2018-11-19 19:16:42 +00:00
for exc in err.exceptions:
2018-11-19 21:53:21 +00:00
if isinstance(exc, tractor.RemoteActorError):
assert exc.boxed_type == err_type
2018-11-19 21:53:21 +00:00
else:
assert isinstance(exc, trio.Cancelled)
elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type
assert n.cancelled is True
assert not n._children
else:
pytest.fail("Should have gotten a remote assertion error?")
async def spawn_and_error(breadth, depth) -> None:
name = tractor.current_actor().name
async with tractor.open_nursery() as nursery:
for i in range(breadth):
if depth > 0:
args = (
spawn_and_error,
)
kwargs = {
'name': f'spawner_{i}_depth_{depth}',
'breadth': breadth,
'depth': depth - 1,
}
else:
args = (
assert_err,
)
kwargs = {
'name': f'{name}_errorer_{i}',
}
await nursery.run_in_actor(*args, **kwargs)
@tractor_test
async def test_nested_multierrors(loglevel, start_method):
2022-01-20 13:26:30 +00:00
'''
Test that failed actor sets are wrapped in `BaseExceptionGroup`s. This
2022-01-20 13:26:30 +00:00
test goes only 2 nurseries deep but we should eventually have tests
for arbitrary n-depth actor trees.
2022-01-20 13:26:30 +00:00
'''
2020-07-25 22:18:34 +00:00
if start_method == 'trio':
depth = 3
subactor_breadth = 2
else:
# XXX: multiprocessing can't seem to handle any more then 2 depth
# process trees for whatever reason.
# Any more process levels then this and we see bugs that cause
# hangs and broken pipes all over the place...
if start_method == 'forkserver':
pytest.skip("Forksever sux hard at nested spawning...")
depth = 1 # means an additional actor tree of spawning (2 levels deep)
subactor_breadth = 2
with trio.fail_after(120):
try:
async with tractor.open_nursery() as nursery:
for i in range(subactor_breadth):
await nursery.run_in_actor(
spawn_and_error,
name=f'spawner_{i}',
breadth=subactor_breadth,
depth=depth,
)
except BaseExceptionGroup as err:
assert len(err.exceptions) == subactor_breadth
for subexc in err.exceptions:
# verify first level actor errors are wrapped as remote
2022-01-20 13:26:30 +00:00
if is_win():
# windows is often too slow and cancellation seems
# to happen before an actor is spawned
2020-07-27 15:14:21 +00:00
if isinstance(subexc, trio.Cancelled):
continue
2021-10-14 16:12:13 +00:00
elif isinstance(subexc, tractor.RemoteActorError):
2020-07-27 15:14:21 +00:00
# on windows it seems we can't exactly be sure wtf
# will happen..
assert subexc.boxed_type in (
2020-07-27 15:14:21 +00:00
tractor.RemoteActorError,
trio.Cancelled,
BaseExceptionGroup,
2020-07-27 15:14:21 +00:00
)
2021-10-14 16:12:13 +00:00
elif isinstance(subexc, BaseExceptionGroup):
2021-10-14 16:12:13 +00:00
for subsub in subexc.exceptions:
if subsub in (tractor.RemoteActorError,):
subsub = subsub.boxed_type
2021-10-15 14:07:45 +00:00
assert type(subsub) in (
2021-10-14 16:12:13 +00:00
trio.Cancelled,
BaseExceptionGroup,
2021-10-14 16:12:13 +00:00
)
else:
assert isinstance(subexc, tractor.RemoteActorError)
if depth > 0 and subactor_breadth > 1:
# XXX not sure what's up with this..
# on windows sometimes spawning is just too slow and
# we get back the (sent) cancel signal instead
2022-01-20 13:26:30 +00:00
if is_win():
2021-10-15 14:07:45 +00:00
if isinstance(subexc, tractor.RemoteActorError):
assert subexc.boxed_type in (
BaseExceptionGroup,
2022-01-20 13:26:30 +00:00
tractor.RemoteActorError
)
2021-10-15 14:07:45 +00:00
else:
assert isinstance(subexc, BaseExceptionGroup)
else:
assert subexc.boxed_type is ExceptionGroup
else:
assert subexc.boxed_type in (
2022-01-20 13:26:30 +00:00
tractor.RemoteActorError,
trio.Cancelled
)
@no_windows
def test_cancel_via_SIGINT(
loglevel,
start_method,
spawn_backend,
):
"""Ensure that a control-C (SIGINT) signal cancels both the parent and
child processes in trionic fashion
"""
pid = os.getpid()
async def main():
with trio.fail_after(2):
async with tractor.open_nursery() as tn:
await tn.start_actor('sucka')
if 'mp' in spawn_backend:
time.sleep(0.1)
os.kill(pid, signal.SIGINT)
await trio.sleep_forever()
with pytest.raises(KeyboardInterrupt):
trio.run(main)
@no_windows
2020-07-27 03:37:44 +00:00
def test_cancel_via_SIGINT_other_task(
loglevel,
start_method,
spawn_backend,
):
"""Ensure that a control-C (SIGINT) signal cancels both the parent
and child processes in trionic fashion even a subprocess is started
from a seperate ``trio`` child task.
"""
pid = os.getpid()
2022-01-20 13:26:30 +00:00
timeout: float = 2
if is_win(): # smh
timeout += 1
async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED):
async with tractor.open_nursery() as tn:
for i in range(3):
await tn.run_in_actor(
sleep_forever,
name='namesucka',
)
task_status.started()
await trio.sleep_forever()
async def main():
# should never timeout since SIGINT should cancel the current program
2022-01-20 13:26:30 +00:00
with trio.fail_after(timeout):
async with trio.open_nursery() as n:
await n.start(spawn_and_sleep_forever)
if 'mp' in spawn_backend:
time.sleep(0.1)
os.kill(pid, signal.SIGINT)
with pytest.raises(KeyboardInterrupt):
trio.run(main)
2021-10-14 16:12:13 +00:00
async def spin_for(period=3):
"Sync sleep."
time.sleep(period)
async def spawn():
async with tractor.open_nursery() as tn:
2021-04-28 15:55:37 +00:00
await tn.run_in_actor(
spin_for,
name='sleeper',
)
2020-10-13 19:26:14 +00:00
@no_windows
def test_cancel_while_childs_child_in_sync_sleep(
loglevel,
start_method,
spawn_backend,
):
"""Verify that a child cancelled while executing sync code is torn
down even when that cancellation is triggered by the parent
2 nurseries "up".
"""
2020-10-13 18:16:20 +00:00
if start_method == 'forkserver':
pytest.skip("Forksever sux hard at resuming from sync sleep...")
async def main():
with trio.fail_after(2):
async with tractor.open_nursery() as tn:
2021-04-28 15:55:37 +00:00
await tn.run_in_actor(
spawn,
name='spawn',
)
await trio.sleep(1)
assert 0
with pytest.raises(AssertionError):
trio.run(main)
def test_fast_graceful_cancel_when_spawn_task_in_soft_proc_wait_for_daemon(
start_method,
):
'''
This is a very subtle test which demonstrates how cancellation
during process collection can result in non-optimal teardown
performance on daemon actors. The fix for this test was to handle
``trio.Cancelled`` specially in the spawn task waiting in
`proc.wait()` such that ``Portal.cancel_actor()`` is called before
executing the "hard reap" sequence (which has an up to 3 second
delay currently).
In other words, if we can cancel the actor using a graceful remote
cancellation, and it's faster, we might as well do it.
'''
kbi_delay = 0.5
2022-01-20 13:26:30 +00:00
timeout: float = 2.9
if is_win(): # smh
timeout += 1
async def main():
start = time.time()
try:
async with trio.open_nursery() as nurse:
async with tractor.open_nursery() as tn:
p = await tn.start_actor(
'fast_boi',
enable_modules=[__name__],
)
async def delayed_kbi():
await trio.sleep(kbi_delay)
print(f'RAISING KBI after {kbi_delay} s')
raise KeyboardInterrupt
# start task which raises a kbi **after**
# the actor nursery ``__aexit__()`` has
# been run.
nurse.start_soon(delayed_kbi)
await p.run(do_nuthin)
finally:
duration = time.time() - start
2022-01-20 13:26:30 +00:00
if duration > timeout:
raise trio.TooSlowError(
'daemon cancel was slower then necessary..'
)
with pytest.raises(KeyboardInterrupt):
trio.run(main)