forked from goodboy/tractor
Leverage `pytest.raises()` better; fix a bunch of docs
parent
3e74cc6f11
commit
a482681f9c
|
@ -26,8 +26,9 @@ async def assert_err():
|
||||||
)
|
)
|
||||||
def test_remote_error(arb_addr, args_err):
|
def test_remote_error(arb_addr, args_err):
|
||||||
"""Verify an error raised in a subactor that is propagated
|
"""Verify an error raised in a subactor that is propagated
|
||||||
to the parent nursery, contains underlying builtin erorr type
|
to the parent nursery, contains the underlying boxed builtin
|
||||||
infot and causes cancellation and reraising.
|
error type info and causes cancellation and reraising all the
|
||||||
|
way up the stack.
|
||||||
"""
|
"""
|
||||||
args, errtype = args_err
|
args, errtype = args_err
|
||||||
|
|
||||||
|
@ -43,13 +44,13 @@ def test_remote_error(arb_addr, args_err):
|
||||||
assert err.type == errtype
|
assert err.type == errtype
|
||||||
print("Look Maa that actor failed hard, hehh")
|
print("Look Maa that actor failed hard, hehh")
|
||||||
raise
|
raise
|
||||||
else:
|
|
||||||
assert 0, "Remote error was not raised?"
|
|
||||||
|
|
||||||
with pytest.raises(tractor.RemoteActorError):
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||||
# also raises
|
|
||||||
tractor.run(main, arbiter_addr=arb_addr)
|
tractor.run(main, arbiter_addr=arb_addr)
|
||||||
|
|
||||||
|
# ensure boxed error is correct
|
||||||
|
assert excinfo.value.type == errtype
|
||||||
|
|
||||||
|
|
||||||
def test_multierror(arb_addr):
|
def test_multierror(arb_addr):
|
||||||
"""Verify we raise a ``trio.MultiError`` out of a nursery where
|
"""Verify we raise a ``trio.MultiError`` out of a nursery where
|
||||||
|
@ -139,8 +140,8 @@ async def test_cancel_infinite_streamer():
|
||||||
)
|
)
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_some_cancels_all(num_actors_and_errs):
|
async def test_some_cancels_all(num_actors_and_errs):
|
||||||
"""Verify one failed actor causes all others in the nursery
|
"""Verify a subset of failed subactors causes all others in
|
||||||
to be cancelled just like in trio.
|
the nursery to be cancelled just like the strategy in trio.
|
||||||
|
|
||||||
This is the first and only supervisory strategy at the moment.
|
This is the first and only supervisory strategy at the moment.
|
||||||
"""
|
"""
|
||||||
|
@ -155,11 +156,10 @@ async def test_some_cancels_all(num_actors_and_errs):
|
||||||
))
|
))
|
||||||
|
|
||||||
for i in range(num):
|
for i in range(num):
|
||||||
# start one actor that will fail immediately
|
# start actor(s) that will fail immediately
|
||||||
await n.run_in_actor(f'extra_{i}', assert_err)
|
await n.run_in_actor(f'extra_{i}', assert_err)
|
||||||
|
|
||||||
# should error here with a ``RemoteActorError`` or
|
# should error here with a ``RemoteActorError`` or ``MultiError``
|
||||||
# ``MultiError`` containing an ``AssertionError`
|
|
||||||
|
|
||||||
except first_err as err:
|
except first_err as err:
|
||||||
if isinstance(err, tractor.MultiError):
|
if isinstance(err, tractor.MultiError):
|
||||||
|
|
|
@ -151,7 +151,7 @@ class ActorNursery:
|
||||||
error propagation, and graceful subprocess tear down.
|
error propagation, and graceful subprocess tear down.
|
||||||
"""
|
"""
|
||||||
async def exhaust_portal(portal, actor):
|
async def exhaust_portal(portal, actor):
|
||||||
"""Pull final result from portal (assuming it was one).
|
"""Pull final result from portal (assuming it has one).
|
||||||
|
|
||||||
If the main task is an async generator do our best to consume
|
If the main task is an async generator do our best to consume
|
||||||
what's left of it.
|
what's left of it.
|
||||||
|
@ -169,7 +169,7 @@ class ActorNursery:
|
||||||
async for item in agen:
|
async for item in agen:
|
||||||
log.debug(f"Consuming item {item}")
|
log.debug(f"Consuming item {item}")
|
||||||
final.append(item)
|
final.append(item)
|
||||||
except Exception as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
# we reraise in the parent task via a ``trio.MultiError``
|
# we reraise in the parent task via a ``trio.MultiError``
|
||||||
return err
|
return err
|
||||||
else:
|
else:
|
||||||
|
@ -187,8 +187,9 @@ class ActorNursery:
|
||||||
"""
|
"""
|
||||||
with trio.open_cancel_scope() as cs:
|
with trio.open_cancel_scope() as cs:
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
# this may error in which case we expect the far end
|
# if this call errors we store the exception for later
|
||||||
# actor to have already terminated itself
|
# in ``errors`` which will be reraised inside
|
||||||
|
# a MultiError and we still send out a cancel request
|
||||||
result = await exhaust_portal(portal, actor)
|
result = await exhaust_portal(portal, actor)
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
errors.append(result)
|
errors.append(result)
|
||||||
|
@ -221,6 +222,8 @@ class ActorNursery:
|
||||||
cancel_scope.cancel()
|
cancel_scope.cancel()
|
||||||
|
|
||||||
log.debug(f"Waiting on all subactors to complete")
|
log.debug(f"Waiting on all subactors to complete")
|
||||||
|
# since we pop each child subactor on termination,
|
||||||
|
# iterate a copy
|
||||||
children = self._children.copy()
|
children = self._children.copy()
|
||||||
errors: List[Exception] = []
|
errors: List[Exception] = []
|
||||||
# wait on run_in_actor() tasks, unblocks when all complete
|
# wait on run_in_actor() tasks, unblocks when all complete
|
||||||
|
@ -355,5 +358,6 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
|
||||||
|
|
||||||
|
|
||||||
def is_main_process():
|
def is_main_process():
|
||||||
"Bool determining if this actor is running in the top-most process."
|
"""Bool determining if this actor is running in the top-most process.
|
||||||
|
"""
|
||||||
return mp.current_process().name == 'MainProcess'
|
return mp.current_process().name == 'MainProcess'
|
||||||
|
|
Loading…
Reference in New Issue