forked from goodboy/tractor
1
0
Fork 0

Merge pull request #42 from tgoodlet/improved_errors

Improved errors - introduce trio.MultiError
loglevel_to_tractor_tests
goodboy 2018-11-22 14:56:11 -05:00 committed by GitHub
commit b0f7e6a954
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 367 additions and 148 deletions

View File

@ -14,26 +14,66 @@ async def assert_err():
assert 0 assert 0
def test_remote_error(arb_addr): @pytest.mark.parametrize(
"""Verify an error raises in a subactor is propagated to the parent. 'args_err',
[
# expected to be thrown in assert_err
({}, AssertionError),
# argument mismatch raised in _invoke()
({'unexpected': 10}, TypeError)
],
ids=['no_args', 'unexpected_args'],
)
def test_remote_error(arb_addr, args_err):
"""Verify an error raised in a subactor that is propagated
to the parent nursery, contains the underlying boxed builtin
error type info and causes cancellation and reraising all the
way up the stack.
"""
args, errtype = args_err
async def main():
async with tractor.open_nursery() as nursery:
portal = await nursery.run_in_actor('errorer', assert_err, **args)
# get result(s) from main task
try:
await portal.result()
except tractor.RemoteActorError as err:
assert err.type == errtype
print("Look Maa that actor failed hard, hehh")
raise
with pytest.raises(tractor.RemoteActorError) as excinfo:
tractor.run(main, arbiter_addr=arb_addr)
# ensure boxed error is correct
assert excinfo.value.type == errtype
def test_multierror(arb_addr):
"""Verify we raise a ``trio.MultiError`` out of a nursery where
more then one actor errors.
""" """
async def main(): async def main():
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
portal = await nursery.run_in_actor('errorer', assert_err) await nursery.run_in_actor('errorer1', assert_err)
portal2 = await nursery.run_in_actor('errorer2', assert_err)
# get result(s) from main task # get result(s) from main task
try: try:
return await portal.result() await portal2.result()
except tractor.RemoteActorError: except tractor.RemoteActorError as err:
print("Look Maa that actor failed hard, hehh") assert err.type == AssertionError
print("Look Maa that first actor failed hard, hehh")
raise raise
except Exception:
pass
assert 0, "Remote error was not raised?"
with pytest.raises(tractor.RemoteActorError): # here we should get a `trio.MultiError` containing exceptions
# also raises # from both subactors
with pytest.raises(trio.MultiError):
tractor.run(main, arbiter_addr=arb_addr) tractor.run(main, arbiter_addr=arb_addr)
@ -42,9 +82,12 @@ def do_nothing():
def test_cancel_single_subactor(arb_addr): def test_cancel_single_subactor(arb_addr):
"""Ensure a ``ActorNursery.start_actor()`` spawned subactor
async def main(): cancels when the nursery is cancelled.
"""
async def spawn_actor():
"""Spawn an actor that blocks indefinitely.
"""
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
portal = await nursery.start_actor( portal = await nursery.start_actor(
@ -55,7 +98,7 @@ def test_cancel_single_subactor(arb_addr):
# would hang otherwise # would hang otherwise
await nursery.cancel() await nursery.cancel()
tractor.run(main, arbiter_addr=arb_addr) tractor.run(spawn_actor, arbiter_addr=arb_addr)
async def stream_forever(): async def stream_forever():
@ -87,13 +130,22 @@ async def test_cancel_infinite_streamer():
assert n.cancelled assert n.cancelled
@pytest.mark.parametrize(
'num_actors_and_errs',
[
(1, tractor.RemoteActorError, AssertionError),
(2, tractor.MultiError, AssertionError)
],
ids=['one_actor', 'two_actors'],
)
@tractor_test @tractor_test
async def test_one_cancels_all(): async def test_some_cancels_all(num_actors_and_errs):
"""Verify one failed actor causes all others in the nursery """Verify a subset of failed subactors causes all others in
to be cancelled just like in trio. the nursery to be cancelled just like the strategy in trio.
This is the first and only supervisory strategy at the moment. This is the first and only supervisory strategy at the moment.
""" """
num, first_err, err_type = num_actors_and_errs
try: try:
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
real_actors = [] real_actors = []
@ -103,13 +155,23 @@ async def test_one_cancels_all():
rpc_module_paths=[__name__], rpc_module_paths=[__name__],
)) ))
# start one actor that will fail immediately for i in range(num):
await n.run_in_actor('extra', assert_err) # start actor(s) that will fail immediately
await n.run_in_actor(f'extra_{i}', assert_err)
# should error here with a ``RemoteActorError`` containing # should error here with a ``RemoteActorError`` or ``MultiError``
# an ``AssertionError`
except first_err as err:
if isinstance(err, tractor.MultiError):
assert len(err.exceptions) == num
for exc in err.exceptions:
if isinstance(exc, tractor.RemoteActorError):
assert exc.type == err_type
else:
assert isinstance(exc, trio.Cancelled)
elif isinstance(err, tractor.RemoteActorError):
assert err.type == err_type
except tractor.RemoteActorError:
assert n.cancelled is True assert n.cancelled is True
assert not n._children assert not n._children
else: else:

View File

@ -41,7 +41,7 @@ def daemon(loglevel, testdir, arb_addr):
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
) )
assert not proc.returncode assert not proc.returncode
wait = 0.6 if sys.version_info < (3, 7) else 0.2 wait = 0.6 if sys.version_info < (3, 7) else 0.4
time.sleep(wait) time.sleep(wait)
yield proc yield proc
sig_prog(proc, signal.SIGINT) sig_prog(proc, signal.SIGINT)

View File

@ -166,7 +166,7 @@ def test_a_quadruple_example(time_quad_ex):
@pytest.mark.parametrize( @pytest.mark.parametrize(
'cancel_delay', 'cancel_delay',
list(map(lambda i: i/10, range(2, 8))) list(map(lambda i: i/10, range(3, 9)))
) )
def test_not_fast_enough_quad(arb_addr, time_quad_ex, cancel_delay): def test_not_fast_enough_quad(arb_addr, time_quad_ex, cancel_delay):
"""Verify we can cancel midway through the quad example and all actors """Verify we can cancel midway through the quad example and all actors

View File

@ -4,10 +4,11 @@ tractor: An actor model micro-framework built on
""" """
import importlib import importlib
from functools import partial from functools import partial
from typing import Tuple, Any, Optional from typing import Tuple, Any
import typing import typing
import trio # type: ignore import trio # type: ignore
from trio import MultiError
from .log import get_console_log, get_logger, get_loglevel from .log import get_console_log, get_logger, get_loglevel
from ._ipc import _connect_chan, Channel from ._ipc import _connect_chan, Channel
@ -16,17 +17,18 @@ from ._actor import (
) )
from ._trionics import open_nursery from ._trionics import open_nursery
from ._state import current_actor from ._state import current_actor
from ._portal import RemoteActorError from ._exceptions import RemoteActorError
__all__ = [ __all__ = [
'current_actor', 'current_actor',
'find_actor', 'find_actor',
'get_arbiter', 'get_arbiter',
'wait_for_actor',
'open_nursery', 'open_nursery',
'RemoteActorError', 'wait_for_actor',
'Channel', 'Channel',
'MultiError',
'RemoteActorError',
] ]

View File

@ -16,6 +16,7 @@ from async_generator import asynccontextmanager, aclosing
from ._ipc import Channel, _connect_chan from ._ipc import Channel, _connect_chan
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
from ._exceptions import pack_error, InternalActorError
from ._portal import ( from ._portal import (
Portal, Portal,
open_portal, open_portal,
@ -33,10 +34,6 @@ class ActorFailure(Exception):
"General actor failure" "General actor failure"
class InternalActorError(RuntimeError):
"Actor primitive internals failure"
async def _invoke( async def _invoke(
actor: 'Actor', actor: 'Actor',
cid: str, cid: str,
@ -49,6 +46,7 @@ async def _invoke(
""" """
sig = inspect.signature(func) sig = inspect.signature(func)
treat_as_gen = False treat_as_gen = False
cs = None
if 'chan' in sig.parameters: if 'chan' in sig.parameters:
assert 'cid' in sig.parameters, \ assert 'cid' in sig.parameters, \
f"{func} must accept a `cid` (caller id) kwarg" f"{func} must accept a `cid` (caller id) kwarg"
@ -122,10 +120,19 @@ async def _invoke(
with trio.open_cancel_scope() as cs: with trio.open_cancel_scope() as cs:
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except Exception: except Exception as err:
# always ship errors back to caller # always ship errors back to caller
log.exception("Actor errored:") log.exception("Actor errored:")
await chan.send({'error': traceback.format_exc(), 'cid': cid}) err_msg = pack_error(err)
err_msg['cid'] = cid
try:
await chan.send(err_msg)
except trio.ClosedResourceError:
log.exception(
f"Failed to ship error to caller @ {chan.uid}")
if cs is None:
# error is from above code not from rpc invocation
task_status.started(err)
finally: finally:
# RPC task bookeeping # RPC task bookeeping
tasks = actor._rpc_tasks.get(chan, None) tasks = actor._rpc_tasks.get(chan, None)
@ -348,13 +355,19 @@ class Actor:
try: try:
ns, funcname, kwargs, actorid, cid = msg['cmd'] ns, funcname, kwargs, actorid, cid = msg['cmd']
except KeyError: except KeyError:
# push any non-rpc-response error to all local consumers # This is the non-rpc error case, that is, an
# and mark the channel as errored # error **not** raised inside a call to ``_invoke()``
chan._exc = err = msg['error'] # (i.e. no cid was provided in the msg - see above).
# Push this error to all local channel consumers
# (normally portals) by marking the channel as errored
tb_str = msg.get('tb_str')
assert chan.uid assert chan.uid
for cid in self._actors2calls[chan.uid]: exc = InternalActorError(
await self._push_result(chan.uid, cid, msg) f"{chan.uid}\n" + tb_str,
raise InternalActorError(f"{chan.uid}\n" + err) **msg,
)
chan._exc = exc
raise exc
log.debug( log.debug(
f"Processing request from {actorid}\n" f"Processing request from {actorid}\n"
@ -373,22 +386,30 @@ class Actor:
# never allow cancelling cancel requests (results in # never allow cancelling cancel requests (results in
# deadlock and other weird behaviour) # deadlock and other weird behaviour)
if func != self.cancel: if func != self.cancel:
self._no_more_rpc_tasks.clear() if isinstance(cs, Exception):
log.info(f"RPC func is {func}") log.warn(f"Task for RPC func {func} failed with {cs}")
self._rpc_tasks.setdefault(chan, []).append((cs, func)) else:
# mark that we have ongoing rpc tasks
self._no_more_rpc_tasks.clear()
log.info(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
self._rpc_tasks.setdefault(chan, []).append((cs, func))
log.debug( log.debug(
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
else: # channel disconnect else:
# channel disconnect
log.debug(f"{chan} from {chan.uid} disconnected") log.debug(f"{chan} from {chan.uid} disconnected")
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke") log.error(f"{chan} form {chan.uid} broke")
except Exception: except Exception as err:
# ship exception (from above code) to parent # ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent
log.exception("Actor errored:") log.exception("Actor errored:")
if self._parent_chan: if self._parent_chan:
await self._parent_chan.send({'error': traceback.format_exc()}) await self._parent_chan.send(pack_error(err))
raise raise
# if this is the `MainProcess` we expect the error broadcasting # if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints" # above to trigger an error at consuming portal "checkpoints"
finally: finally:
@ -480,25 +501,30 @@ class Actor:
# blocks here as expected until the channel server is # blocks here as expected until the channel server is
# killed (i.e. this actor is cancelled or signalled by the parent) # killed (i.e. this actor is cancelled or signalled by the parent)
except Exception: except Exception as err:
if not registered_with_arbiter:
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {arbiter_addr}")
if self._parent_chan: if self._parent_chan:
try: try:
# internal error so ship to parent without cid
await self._parent_chan.send( await self._parent_chan.send(
{'error': traceback.format_exc()}) pack_error(err))
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error( log.error(
f"Failed to ship error to parent " f"Failed to ship error to parent "
f"{self._parent_chan.uid}, channel was closed") f"{self._parent_chan.uid}, channel was closed")
log.exception("Actor errored:") log.exception("Actor errored:")
if not registered_with_arbiter:
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {arbiter_addr}")
else: else:
# XXX wait, why?
# causes a hang if I always raise..
raise raise
finally: finally:
await self._do_unreg(arbiter_addr) if registered_with_arbiter:
await self._do_unreg(arbiter_addr)
# terminate actor once all it's peers (actors that connected # terminate actor once all it's peers (actors that connected
# to it as clients) have disappeared # to it as clients) have disappeared
if not self._no_more_peers.is_set(): if not self._no_more_peers.is_set():

View File

@ -0,0 +1,50 @@
"""
Our classy exception set.
"""
import builtins
import traceback
class RemoteActorError(Exception):
# TODO: local recontruction of remote exception deats
"Remote actor exception bundled locally"
def __init__(self, message, type_str, **msgdata):
super().__init__(message)
self.type = getattr(builtins, type_str, Exception)
self.msgdata = msgdata
# TODO: a trio.MultiError.catch like context manager
# for catching underlying remote errors of a particular type
class InternalActorError(RemoteActorError):
"""Remote internal ``tractor`` error indicating
failure of some primitive or machinery.
"""
class NoResult(RuntimeError):
"No final result is expected for this actor"
def pack_error(exc):
"""Create an "error message" for tranmission over
a channel (aka the wire).
"""
return {
'error': {
'tb_str': traceback.format_exc(),
'type_str': type(exc).__name__,
}
}
def unpack_error(msg, chan=None):
"""Unpack an 'error' message from the wire
into a local ``RemoteActorError``.
"""
tb_str = msg['error'].get('tb_str', '')
return RemoteActorError(
f"{chan.uid}\n" + tb_str,
**msg['error'],
)

View File

@ -12,15 +12,12 @@ from async_generator import asynccontextmanager
from ._state import current_actor from ._state import current_actor
from ._ipc import Channel from ._ipc import Channel
from .log import get_logger from .log import get_logger
from ._exceptions import unpack_error, NoResult, RemoteActorError
log = get_logger('tractor') log = get_logger('tractor')
class RemoteActorError(RuntimeError):
"Remote actor exception bundled locally"
@asynccontextmanager @asynccontextmanager
async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): async def maybe_open_nursery(nursery: trio._core._run.Nursery = None):
"""Create a new nursery if None provided. """Create a new nursery if None provided.
@ -63,8 +60,8 @@ class Portal:
# when this is set to a tuple returned from ``_submit()`` then # when this is set to a tuple returned from ``_submit()`` then
# it is expected that ``result()`` will be awaited at some point # it is expected that ``result()`` will be awaited at some point
# during the portal's lifetime # during the portal's lifetime
self._result = None self._result: Optional[Any] = None
self._exc: Optional[RemoteActorError] = None # set when _submit_for_result is called
self._expect_result: Optional[ self._expect_result: Optional[
Tuple[str, Any, str, Dict[str, Any]] Tuple[str, Any, str, Dict[str, Any]]
] = None ] = None
@ -97,8 +94,7 @@ class Portal:
elif functype == 'asyncgen': elif functype == 'asyncgen':
resp_type = 'yield' resp_type = 'yield'
elif 'error' in first_msg: elif 'error' in first_msg:
raise RemoteActorError( raise unpack_error(first_msg, self.channel)
f"{self.channel.uid}\n" + first_msg['error'])
else: else:
raise ValueError(f"{first_msg} is an invalid response packet?") raise ValueError(f"{first_msg} is an invalid response packet?")
@ -110,10 +106,11 @@ class Portal:
self._expect_result = await self._submit(ns, func, **kwargs) self._expect_result = await self._submit(ns, func, **kwargs)
async def run(self, ns: str, func: str, **kwargs) -> Any: async def run(self, ns: str, func: str, **kwargs) -> Any:
"""Submit a function to be scheduled and run by actor, wrap and return """Submit a remote function to be scheduled and run by actor,
its (stream of) result(s). wrap and return its (stream of) result(s).
This is a blocking call. This is a blocking call and returns either a value from the
remote rpc task or a local async generator instance.
""" """
return await self._return_from_resptype( return await self._return_from_resptype(
*(await self._submit(ns, func, **kwargs)) *(await self._submit(ns, func, **kwargs))
@ -137,14 +134,19 @@ class Portal:
if 'stop' in msg: if 'stop' in msg:
break # far end async gen terminated break # far end async gen terminated
else: else:
raise RemoteActorError( # internal error should never get here
f"{self.channel.uid}\n" + msg['error']) assert msg.get('cid'), (
"Received internal error at portal?")
raise unpack_error(msg, self.channel)
except StopAsyncIteration: except StopAsyncIteration:
log.debug( log.debug(
f"Cancelling async gen call {cid} to " f"Cancelling async gen call {cid} to "
f"{self.channel.uid}") f"{self.channel.uid}")
raise raise
# TODO: use AsyncExitStack to aclose() all agens
# on teardown
return yield_from_q() return yield_from_q()
elif resptype == 'return': elif resptype == 'return':
@ -152,30 +154,43 @@ class Portal:
try: try:
return msg['return'] return msg['return']
except KeyError: except KeyError:
self._exc = RemoteActorError( # internal error should never get here
f"{self.channel.uid}\n" + msg['error']) assert msg.get('cid'), "Received internal error at portal?"
raise self._exc raise unpack_error(msg, self.channel)
else: else:
raise ValueError(f"Unknown msg response type: {first_msg}") raise ValueError(f"Unknown msg response type: {first_msg}")
async def result(self) -> Any: async def result(self) -> Any:
"""Return the result(s) from the remote actor's "main" task. """Return the result(s) from the remote actor's "main" task.
""" """
if self._expect_result is None: # Check for non-rpc errors slapped on the
# (remote) errors are slapped on the channel # channel for which we always raise
# teardown can reraise them exc = self.channel._exc
exc = self.channel._exc if exc:
if exc: raise exc
raise RemoteActorError(f"{self.channel.uid}\n{exc}")
else: # not expecting a "main" result
raise RuntimeError( if self._expect_result is None:
f"Portal for {self.channel.uid} is not expecting a final" log.warn(
"result?") f"Portal for {self.channel.uid} not expecting a final"
" result?\nresult() should only be called if subactor"
" was spawned with `ActorNursery.run_in_actor()`")
return NoResult
# expecting a "main" result
assert self._expect_result
if self._result is None:
try:
self._result = await self._return_from_resptype(
*self._expect_result
)
except RemoteActorError as err:
self._result = err
# re-raise error on every call
if isinstance(self._result, RemoteActorError):
raise self._result
elif self._result is None:
self._result = await self._return_from_resptype(
*self._expect_result
)
return self._result return self._result
async def close(self) -> None: async def close(self) -> None:

View File

@ -32,7 +32,8 @@ class ActorNursery:
Tuple[str, str], Tuple[str, str],
Tuple[Actor, mp.Process, Optional[Portal]] Tuple[Actor, mp.Process, Optional[Portal]]
] = {} ] = {}
# portals spawned with ``run_in_actor()`` # portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set() self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False self.cancelled: bool = False
self._forkserver: forkserver.ForkServer = None self._forkserver: forkserver.ForkServer = None
@ -132,6 +133,8 @@ class ActorNursery:
bind_addr=bind_addr, bind_addr=bind_addr,
statespace=statespace, statespace=statespace,
) )
# this marks the actor to be cancelled after its portal result
# is retreived, see ``wait()`` below.
self._cancel_after_result_on_exit.add(portal) self._cancel_after_result_on_exit.add(portal)
await portal._submit_for_result( await portal._submit_for_result(
mod_path, mod_path,
@ -142,29 +145,66 @@ class ActorNursery:
async def wait(self) -> None: async def wait(self) -> None:
"""Wait for all subactors to complete. """Wait for all subactors to complete.
This is probably the most complicated (and confusing, sorry)
function that does all the clever crap to deal with cancellation,
error propagation, and graceful subprocess tear down.
""" """
async def maybe_consume_result(portal, actor): async def exhaust_portal(portal, actor):
if ( """Pull final result from portal (assuming it has one).
portal in self._cancel_after_result_on_exit and
(portal._result is None and portal._exc is None) If the main task is an async generator do our best to consume
): what's left of it.
log.debug(f"Waiting on final result from {subactor.uid}") """
res = await portal.result() try:
# if it's an async-gen then we should alert the user log.debug(f"Waiting on final result from {actor.uid}")
# that we're cancelling it final = res = await portal.result()
# if it's an async-gen then alert that we're cancelling it
if inspect.isasyncgen(res): if inspect.isasyncgen(res):
final = []
log.warning( log.warning(
f"Blindly consuming asyncgen for {actor.uid}") f"Blindly consuming asyncgen for {actor.uid}")
with trio.fail_after(1): with trio.fail_after(1):
async with aclosing(res) as agen: async with aclosing(res) as agen:
async for item in agen: async for item in agen:
log.debug(f"Consuming item {item}") log.debug(f"Consuming item {item}")
final.append(item)
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
return err
else:
return final
async def cancel_on_completion(
portal: Portal,
actor: Actor,
task_status=trio.TASK_STATUS_IGNORED,
) -> None:
"""Cancel actor gracefully once it's "main" portal's
result arrives.
Should only be called for actors spawned with `run_in_actor()`.
"""
with trio.open_cancel_scope() as cs:
task_status.started(cs)
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors.append(result)
log.info(f"Cancelling {portal.channel.uid} gracefully")
await portal.cancel_actor()
if cs.cancelled_caught:
log.warning(
"Result waiter was cancelled, process may have died")
async def wait_for_proc( async def wait_for_proc(
proc: mp.Process, proc: mp.Process,
actor: Actor, actor: Actor,
portal: Portal, portal: Portal,
cancel_scope: trio._core._run.CancelScope, cancel_scope: Optional[trio._core._run.CancelScope] = None,
) -> None: ) -> None:
# TODO: timeout block here? # TODO: timeout block here?
if proc.is_alive(): if proc.is_alive():
@ -172,42 +212,59 @@ class ActorNursery:
# please god don't hang # please god don't hang
proc.join() proc.join()
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
await maybe_consume_result(portal, actor)
self._children.pop(actor.uid) self._children.pop(actor.uid)
# proc terminated, cancel result waiter
# proc terminated, cancel result waiter that may have
# been spawned in tandem
if cancel_scope: if cancel_scope:
log.warning( log.warning(
f"Cancelling existing result waiter task for {actor.uid}") f"Cancelling existing result waiter task for {actor.uid}")
cancel_scope.cancel() cancel_scope.cancel()
async def wait_for_actor( log.debug(f"Waiting on all subactors to complete")
portal: Portal, # since we pop each child subactor on termination,
actor: Actor, # iterate a copy
task_status=trio.TASK_STATUS_IGNORED,
) -> None:
# cancel the actor gracefully
with trio.open_cancel_scope() as cs:
task_status.started(cs)
await maybe_consume_result(portal, actor)
log.info(f"Cancelling {portal.channel.uid} gracefully")
await portal.cancel_actor()
if cs.cancelled_caught:
log.warning("Result waiter was cancelled")
# unblocks when all waiter tasks have completed
children = self._children.copy() children = self._children.copy()
errors: List[Exception] = []
# wait on run_in_actor() tasks, unblocks when all complete
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, portal in children.values(): for subactor, proc, portal in children.values():
cs = None cs = None
# portal from ``run_in_actor()``
if portal in self._cancel_after_result_on_exit: if portal in self._cancel_after_result_on_exit:
cs = await nursery.start(wait_for_actor, portal, subactor) cs = await nursery.start(
cancel_on_completion, portal, subactor)
# TODO: how do we handle remote host spawned actors?
nursery.start_soon(
wait_for_proc, proc, subactor, portal, cs)
if errors:
if not self.cancelled:
# halt here and expect to be called again once the nursery
# has been cancelled externally (ex. from within __aexit__()
# if an error is captured from ``wait()`` then ``cancel()``
# is called immediately after which in turn calls ``wait()``
# again.)
raise trio.MultiError(errors)
# wait on all `start_actor()` subactors to complete
# if errors were captured above and we have not been cancelled
# then these ``start_actor()`` spawned actors will block until
# cancelled externally
children = self._children.copy()
async with trio.open_nursery() as nursery:
for subactor, proc, portal in children.values():
# TODO: how do we handle remote host spawned actors?
nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
log.debug(f"All subactors for {self} have terminated")
if errors:
# always raise any error if we're also cancelled
raise trio.MultiError(errors)
async def cancel(self, hard_kill: bool = False) -> None: async def cancel(self, hard_kill: bool = False) -> None:
"""Cancel this nursery by instructing each subactor to cancel """Cancel this nursery by instructing each subactor to cancel
iteslf and wait for all subprocesses to terminate. itself and wait for all subactors to terminate.
If ``hard_killl`` is set to ``True`` then kill the processes If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation. directly without any far end graceful ``trio`` cancellation.
@ -234,56 +291,57 @@ class ActorNursery:
# channel/portal should now be up # channel/portal should now be up
_, _, portal = self._children[subactor.uid] _, _, portal = self._children[subactor.uid]
if portal is None: if portal is None:
# cancelled while waiting on the event? # cancelled while waiting on the event
# to arrive
chan = self._actor._peers[subactor.uid][-1] chan = self._actor._peers[subactor.uid][-1]
if chan: if chan:
portal = Portal(chan) portal = Portal(chan)
else: # there's no other choice left else: # there's no other choice left
do_hard_kill(proc) do_hard_kill(proc)
# spawn cancel tasks async # spawn cancel tasks
assert portal assert portal
n.start_soon(portal.cancel_actor) n.start_soon(portal.cancel_actor)
log.debug(f"Waiting on all subactors to complete") # mark ourselves as having (tried to have) cancelled all subactors
await self.wait()
self.cancelled = True self.cancelled = True
log.debug(f"All subactors for {self} have terminated") await self.wait()
async def __aexit__(self, etype, value, tb): async def __aexit__(self, etype, value, tb):
"""Wait on all subactor's main routines to complete. """Wait on all subactor's main routines to complete.
""" """
try: if etype is not None:
if etype is not None: try:
# XXX: hypothetically an error could be raised and then # XXX: hypothetically an error could be raised and then
# a cancel signal shows up slightly after in which case the # a cancel signal shows up slightly after in which case
# else block here might not complete? Should both be shielded? # the `else:` block here might not complete?
# For now, shield both.
with trio.open_cancel_scope(shield=True): with trio.open_cancel_scope(shield=True):
if etype is trio.Cancelled: if etype is trio.Cancelled:
log.warning( log.warning(
f"{current_actor().uid} was cancelled with {etype}" f"Nursery for {current_actor().uid} was "
", cancelling actor nursery") f"cancelled with {etype}")
await self.cancel()
else: else:
log.exception( log.exception(
f"{current_actor().uid} errored with {etype}, " f"Nursery for {current_actor().uid} "
"cancelling actor nursery") f"errored with {etype}, ")
await self.cancel()
else:
# XXX: this is effectively the lone cancellation/supervisor
# strategy which exactly mimicks trio's behaviour
log.debug(f"Waiting on subactors {self._children} to complete")
try:
await self.wait()
except Exception as err:
log.warning(f"Nursery caught {err}, cancelling")
await self.cancel() await self.cancel()
raise except trio.MultiError as merr:
log.debug(f"Nursery teardown complete") if value not in merr.exceptions:
except Exception: raise trio.MultiError(merr.exceptions + [value])
log.exception("Error on nursery exit:") raise
await self.wait() else:
raise # XXX: this is effectively the (for now) lone
# cancellation/supervisor strategy which exactly
# mimicks trio's behaviour
log.debug(f"Waiting on subactors {self._children} to complete")
try:
await self.wait()
except (Exception, trio.MultiError) as err:
log.warning(f"Nursery caught {err}, cancelling")
await self.cancel()
raise
log.debug(f"Nursery teardown complete")
@asynccontextmanager @asynccontextmanager
@ -297,3 +355,9 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
# TODO: figure out supervisors from erlang # TODO: figure out supervisors from erlang
async with ActorNursery(current_actor()) as nursery: async with ActorNursery(current_actor()) as nursery:
yield nursery yield nursery
def is_main_process():
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'