Add timeout around inf-streamer suite
Since with the new actorc injection seems to be hanging? Not sure what exactly the issue is but likely races again during teardown between the `.run_in_actor()` remote-exc capture and any actorc after the `portal.cancel()`.. Also tossed in a bp to figure out why actorcs aren't actually showing outside the `trio.run()`..?actor_cancelled_exc_type
parent
f6ba50979b
commit
b8019f90ec
|
@ -11,6 +11,9 @@ from itertools import repeat
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
from tractor._exceptions import (
|
||||||
|
ActorCancelled,
|
||||||
|
)
|
||||||
from tractor._testing import (
|
from tractor._testing import (
|
||||||
tractor_test,
|
tractor_test,
|
||||||
)
|
)
|
||||||
|
@ -124,7 +127,10 @@ def test_multierror(
|
||||||
) as nursery:
|
) as nursery:
|
||||||
|
|
||||||
await nursery.run_in_actor(assert_err, name='errorer1')
|
await nursery.run_in_actor(assert_err, name='errorer1')
|
||||||
portal2 = await nursery.run_in_actor(assert_err, name='errorer2')
|
portal2 = await nursery.run_in_actor(
|
||||||
|
assert_err,
|
||||||
|
name='errorer2',
|
||||||
|
)
|
||||||
|
|
||||||
# get result(s) from main task
|
# get result(s) from main task
|
||||||
try:
|
try:
|
||||||
|
@ -137,7 +143,15 @@ def test_multierror(
|
||||||
# here we should get a ``BaseExceptionGroup`` containing exceptions
|
# here we should get a ``BaseExceptionGroup`` containing exceptions
|
||||||
# from both subactors
|
# from both subactors
|
||||||
|
|
||||||
with pytest.raises(BaseExceptionGroup):
|
with pytest.raises(
|
||||||
|
expected_exception=(
|
||||||
|
tractor.RemoteActorError,
|
||||||
|
|
||||||
|
# ?TODO, should it be this??
|
||||||
|
# like `trio`'s strict egs?
|
||||||
|
BaseExceptionGroup,
|
||||||
|
),
|
||||||
|
):
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@ -233,8 +247,9 @@ async def stream_forever():
|
||||||
|
|
||||||
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_cancel_infinite_streamer(start_method):
|
async def test_cancel_infinite_streamer(
|
||||||
|
start_method: str,
|
||||||
|
):
|
||||||
# stream for at most 1 seconds
|
# stream for at most 1 seconds
|
||||||
with (
|
with (
|
||||||
trio.fail_after(4),
|
trio.fail_after(4),
|
||||||
|
@ -291,6 +306,7 @@ async def test_some_cancels_all(
|
||||||
num_actors_and_errs: tuple,
|
num_actors_and_errs: tuple,
|
||||||
start_method: str,
|
start_method: str,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Verify a subset of failed subactors causes all others in
|
Verify a subset of failed subactors causes all others in
|
||||||
|
@ -306,68 +322,81 @@ async def test_some_cancels_all(
|
||||||
ria_func,
|
ria_func,
|
||||||
da_func,
|
da_func,
|
||||||
) = num_actors_and_errs
|
) = num_actors_and_errs
|
||||||
try:
|
with trio.fail_after(
|
||||||
async with tractor.open_nursery() as an:
|
3
|
||||||
|
if not debug_mode
|
||||||
|
else 999
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
async with tractor.open_nursery() as an:
|
||||||
|
|
||||||
# spawn the same number of deamon actors which should be cancelled
|
# spawn the same number of deamon actors which should be cancelled
|
||||||
dactor_portals = []
|
dactor_portals = []
|
||||||
for i in range(num_actors):
|
for i in range(num_actors):
|
||||||
dactor_portals.append(await an.start_actor(
|
dactor_portals.append(await an.start_actor(
|
||||||
f'deamon_{i}',
|
f'deamon_{i}',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
))
|
))
|
||||||
|
|
||||||
func, kwargs = ria_func
|
func, kwargs = ria_func
|
||||||
riactor_portals = []
|
riactor_portals = []
|
||||||
for i in range(num_actors):
|
for i in range(num_actors):
|
||||||
# start actor(s) that will fail immediately
|
# start actor(s) that will fail immediately
|
||||||
riactor_portals.append(
|
riactor_portals.append(
|
||||||
await an.run_in_actor(
|
await an.run_in_actor(
|
||||||
func,
|
func,
|
||||||
name=f'actor_{i}',
|
name=f'actor_{i}',
|
||||||
**kwargs
|
**kwargs
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
if da_func:
|
if da_func:
|
||||||
func, kwargs, expect_error = da_func
|
func, kwargs, expect_error = da_func
|
||||||
for portal in dactor_portals:
|
for portal in dactor_portals:
|
||||||
# if this function fails then we should error here
|
# if this function fails then we should error here
|
||||||
# and the nursery should teardown all other actors
|
# and the nursery should teardown all other actors
|
||||||
try:
|
try:
|
||||||
await portal.run(func, **kwargs)
|
await portal.run(func, **kwargs)
|
||||||
|
|
||||||
except tractor.RemoteActorError as err:
|
except tractor.RemoteActorError as err:
|
||||||
assert err.boxed_type == err_type
|
assert err.boxed_type == err_type
|
||||||
# we only expect this first error to propogate
|
# we only expect this first error to propogate
|
||||||
# (all other daemons are cancelled before they
|
# (all other daemons are cancelled before they
|
||||||
# can be scheduled)
|
# can be scheduled)
|
||||||
num_actors = 1
|
num_actors = 1
|
||||||
# reraise so nursery teardown is triggered
|
# reraise so nursery teardown is triggered
|
||||||
raise
|
raise
|
||||||
|
else:
|
||||||
|
if expect_error:
|
||||||
|
pytest.fail(
|
||||||
|
"Deamon call should fail at checkpoint?")
|
||||||
|
|
||||||
|
# should error here with a ``RemoteActorError`` or ``MultiError``
|
||||||
|
|
||||||
|
except first_err as _err:
|
||||||
|
err = _err
|
||||||
|
|
||||||
|
if isinstance(err, BaseExceptionGroup):
|
||||||
|
|
||||||
|
assert len(err.exceptions) == num_actors
|
||||||
|
for exc in err.exceptions:
|
||||||
|
|
||||||
|
# TODO, figure out why these aren't being set?
|
||||||
|
if isinstance(exc, ActorCancelled):
|
||||||
|
breakpoint()
|
||||||
|
|
||||||
|
if isinstance(exc, tractor.RemoteActorError):
|
||||||
|
assert exc.boxed_type == err_type
|
||||||
else:
|
else:
|
||||||
if expect_error:
|
assert isinstance(exc, trio.Cancelled)
|
||||||
pytest.fail(
|
|
||||||
"Deamon call should fail at checkpoint?")
|
|
||||||
|
|
||||||
# should error here with a ``RemoteActorError`` or ``MultiError``
|
elif isinstance(err, tractor.RemoteActorError):
|
||||||
|
assert err.boxed_type == err_type
|
||||||
|
|
||||||
except first_err as _err:
|
assert an.cancelled is True
|
||||||
err = _err
|
assert not an._children
|
||||||
if isinstance(err, BaseExceptionGroup):
|
else:
|
||||||
assert len(err.exceptions) == num_actors
|
pytest.fail("Should have gotten a remote assertion error?")
|
||||||
for exc in err.exceptions:
|
|
||||||
if isinstance(exc, tractor.RemoteActorError):
|
|
||||||
assert exc.boxed_type == err_type
|
|
||||||
else:
|
|
||||||
assert isinstance(exc, trio.Cancelled)
|
|
||||||
elif isinstance(err, tractor.RemoteActorError):
|
|
||||||
assert err.boxed_type == err_type
|
|
||||||
|
|
||||||
assert an.cancelled is True
|
|
||||||
assert not an._children
|
|
||||||
else:
|
|
||||||
pytest.fail("Should have gotten a remote assertion error?")
|
|
||||||
|
|
||||||
|
|
||||||
async def spawn_and_error(breadth, depth) -> None:
|
async def spawn_and_error(breadth, depth) -> None:
|
||||||
|
|
Loading…
Reference in New Issue