From 2f6609ab7821d04a1e43c6176e6398508fa17cd9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Nov 2018 03:51:12 -0500 Subject: [PATCH 01/12] Add custom exceptions with msg (un)packing --- tractor/_exceptions.py | 50 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 tractor/_exceptions.py diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py new file mode 100644 index 0000000..7244396 --- /dev/null +++ b/tractor/_exceptions.py @@ -0,0 +1,50 @@ +""" +Our classy exception set. +""" +import builtins +import traceback + + +class RemoteActorError(Exception): + # TODO: local recontruction of remote exception deats + "Remote actor exception bundled locally" + def __init__(self, message, type_str, **msgdata): + super().__init__(message) + self.type = getattr(builtins, type_str, Exception) + self.msgdata = msgdata + + # TODO: a trio.MultiError.catch like context manager + # for catching underlying remote errors of a particular type + + +class InternalActorError(RemoteActorError): + """Remote internal ``tractor`` error indicating + failure of some primitive or machinery. + """ + + +class NoResult(RuntimeError): + "No final result is expected for this actor" + + +def pack_error(exc): + """Create an "error message" for tranmission over + a channel (aka the wire). + """ + return { + 'error': { + 'tb_str': traceback.format_exc(), + 'type_str': type(exc).__name__, + } + } + + +def unpack_error(msg, chan=None): + """Unpack an 'error' message from the wire + into a local ``RemoteActorError``. + """ + tb_str = msg['error'].get('tb_str', '') + return RemoteActorError( + f"{chan.uid}\n" + tb_str, + **msg['error'], + ) From e75b25dc211c3eceaeb8fd3c74f4c81851dc2bfe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Nov 2018 04:05:07 -0500 Subject: [PATCH 02/12] Improve error propagation machinery Use the new custom error types throughout the actor and portal primitives and set a few new rules: - internal errors are any error not raised by an rpc task and are **not** forwarded to portals but instead are raised directly in the msg loop. - portals always re-raise a "main task" error for every call to ``Portal.result()``. --- tractor/__init__.py | 2 +- tractor/_actor.py | 79 +++++++++++++++++++++++++++++---------------- tractor/_portal.py | 73 ++++++++++++++++++++++++----------------- 3 files changed, 96 insertions(+), 58 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index ca2055f..61e3645 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -16,7 +16,7 @@ from ._actor import ( ) from ._trionics import open_nursery from ._state import current_actor -from ._portal import RemoteActorError +from ._exceptions import RemoteActorError __all__ = [ diff --git a/tractor/_actor.py b/tractor/_actor.py index 8d686d5..671405d 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -16,6 +16,7 @@ from async_generator import asynccontextmanager, aclosing from ._ipc import Channel, _connect_chan from .log import get_console_log, get_logger +from ._exceptions import pack_error, InternalActorError from ._portal import ( Portal, open_portal, @@ -33,10 +34,6 @@ class ActorFailure(Exception): "General actor failure" -class InternalActorError(RuntimeError): - "Actor primitive internals failure" - - async def _invoke( actor: 'Actor', cid: str, @@ -49,6 +46,7 @@ async def _invoke( """ sig = inspect.signature(func) treat_as_gen = False + cs = None if 'chan' in sig.parameters: assert 'cid' in sig.parameters, \ f"{func} must accept a `cid` (caller id) kwarg" @@ -122,10 +120,16 @@ async def _invoke( with trio.open_cancel_scope() as cs: task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) - except Exception: + except Exception as err: # always ship errors back to caller log.exception("Actor errored:") - await chan.send({'error': traceback.format_exc(), 'cid': cid}) + err_msg = pack_error(err) + err_msg['cid'] = cid + await chan.send(err_msg) + + if cs is None: + # error is from above code not from rpc invocation + task_status.started(err) finally: # RPC task bookeeping tasks = actor._rpc_tasks.get(chan, None) @@ -348,13 +352,19 @@ class Actor: try: ns, funcname, kwargs, actorid, cid = msg['cmd'] except KeyError: - # push any non-rpc-response error to all local consumers - # and mark the channel as errored - chan._exc = err = msg['error'] + # This is the non-rpc error case, that is, an + # error **not** raised inside a call to ``_invoke()`` + # (i.e. no cid was provided in the msg - see above). + # Push this error to all local channel consumers + # (normally portals) by marking the channel as errored + tb_str = msg.pop('tb_str') assert chan.uid - for cid in self._actors2calls[chan.uid]: - await self._push_result(chan.uid, cid, msg) - raise InternalActorError(f"{chan.uid}\n" + err) + exc = InternalActorError( + f"{self.channel.uid}\n" + tb_str, + **msg, + ) + chan._exc = exc + raise exc log.debug( f"Processing request from {actorid}\n" @@ -373,22 +383,30 @@ class Actor: # never allow cancelling cancel requests (results in # deadlock and other weird behaviour) if func != self.cancel: - self._no_more_rpc_tasks.clear() - log.info(f"RPC func is {func}") - self._rpc_tasks.setdefault(chan, []).append((cs, func)) + if isinstance(cs, Exception): + log.warn(f"Task for RPC func {func} failed with {cs}") + else: + # mark that we have ongoing rpc tasks + self._no_more_rpc_tasks.clear() + log.info(f"RPC func is {func}") + # store cancel scope such that the rpc task can be + # cancelled gracefully if requested + self._rpc_tasks.setdefault(chan, []).append((cs, func)) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") - else: # channel disconnect + else: + # channel disconnect log.debug(f"{chan} from {chan.uid} disconnected") except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") - except Exception: - # ship exception (from above code) to parent + except Exception as err: + # ship any "internal" exception (i.e. one from internal machinery + # not from an rpc task) to parent log.exception("Actor errored:") if self._parent_chan: - await self._parent_chan.send({'error': traceback.format_exc()}) - raise + await self._parent_chan.send(pack_error(err)) + raise # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" finally: @@ -480,25 +498,30 @@ class Actor: # blocks here as expected until the channel server is # killed (i.e. this actor is cancelled or signalled by the parent) - except Exception: + except Exception as err: + if not registered_with_arbiter: + log.exception( + f"Actor errored and failed to register with arbiter " + f"@ {arbiter_addr}") + if self._parent_chan: try: + # internal error so ship to parent without cid await self._parent_chan.send( - {'error': traceback.format_exc()}) + pack_error(err)) except trio.ClosedResourceError: log.error( f"Failed to ship error to parent " f"{self._parent_chan.uid}, channel was closed") log.exception("Actor errored:") - - if not registered_with_arbiter: - log.exception( - f"Actor errored and failed to register with arbiter " - f"@ {arbiter_addr}") else: + # XXX wait, why? + # causes a hang if I always raise.. raise + finally: - await self._do_unreg(arbiter_addr) + if registered_with_arbiter: + await self._do_unreg(arbiter_addr) # terminate actor once all it's peers (actors that connected # to it as clients) have disappeared if not self._no_more_peers.is_set(): diff --git a/tractor/_portal.py b/tractor/_portal.py index 9dc0cba..83b8e13 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -12,15 +12,12 @@ from async_generator import asynccontextmanager from ._state import current_actor from ._ipc import Channel from .log import get_logger +from ._exceptions import unpack_error, NoResult, RemoteActorError log = get_logger('tractor') -class RemoteActorError(RuntimeError): - "Remote actor exception bundled locally" - - @asynccontextmanager async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): """Create a new nursery if None provided. @@ -64,7 +61,7 @@ class Portal: # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime self._result = None - self._exc: Optional[RemoteActorError] = None + # set when _submit_for_result is called self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None @@ -97,8 +94,7 @@ class Portal: elif functype == 'asyncgen': resp_type = 'yield' elif 'error' in first_msg: - raise RemoteActorError( - f"{self.channel.uid}\n" + first_msg['error']) + raise unpack_error(first_msg, self.channel) else: raise ValueError(f"{first_msg} is an invalid response packet?") @@ -110,10 +106,11 @@ class Portal: self._expect_result = await self._submit(ns, func, **kwargs) async def run(self, ns: str, func: str, **kwargs) -> Any: - """Submit a function to be scheduled and run by actor, wrap and return - its (stream of) result(s). + """Submit a remote function to be scheduled and run by actor, + wrap and return its (stream of) result(s). - This is a blocking call. + This is a blocking call and returns either a value from the + remote rpc task or a local async generator instance. """ return await self._return_from_resptype( *(await self._submit(ns, func, **kwargs)) @@ -137,14 +134,19 @@ class Portal: if 'stop' in msg: break # far end async gen terminated else: - raise RemoteActorError( - f"{self.channel.uid}\n" + msg['error']) + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") + raise unpack_error(msg, self.channel) + except StopAsyncIteration: log.debug( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") raise + # TODO: use AsyncExitStack to aclose() all agens + # on teardown return yield_from_q() elif resptype == 'return': @@ -152,30 +154,43 @@ class Portal: try: return msg['return'] except KeyError: - self._exc = RemoteActorError( - f"{self.channel.uid}\n" + msg['error']) - raise self._exc + # internal error should never get here + assert msg.get('cid'), "Received internal error at portal?" + raise unpack_error(msg, self.channel) else: raise ValueError(f"Unknown msg response type: {first_msg}") async def result(self) -> Any: """Return the result(s) from the remote actor's "main" task. """ - if self._expect_result is None: - # (remote) errors are slapped on the channel - # teardown can reraise them - exc = self.channel._exc - if exc: - raise RemoteActorError(f"{self.channel.uid}\n{exc}") - else: - raise RuntimeError( - f"Portal for {self.channel.uid} is not expecting a final" - "result?") + # Check for non-rpc errors slapped on the + # channel for which we always raise + exc = self.channel._exc + if exc: + raise exc + + # not expecting a "main" result + if self._expect_result is None: + log.warn( + f"Portal for {self.channel.uid} not expecting a final" + " result?\nresult() should only be called if subactor" + " was spawned with `ActorNursery.run_in_actor()`") + return NoResult + + # expecting a "main" result + assert self._expect_result + if self._result is None: + try: + self._result = await self._return_from_resptype( + *self._expect_result + ) + except RemoteActorError as err: + self._result = err + + # re-raise error on every call + if isinstance(self._result, RemoteActorError): + raise self._result - elif self._result is None: - self._result = await self._return_from_resptype( - *self._expect_result - ) return self._result async def close(self) -> None: From 835d1fa07a37a413ef0d0e7f0462205e2d25a654 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Nov 2018 04:12:54 -0500 Subject: [PATCH 03/12] Vastly improve error triggered cancellation At the expense of a bit more complexity in `ActorNursery.wait()` (which I commented the heck out of fwiw) this adds far superior and correct cancellation semantics for when a nursery is cancelled due to (remote) errors in subactors. This includes: - `wait()` will now raise a `trio.MultiError` if multiple subactors error with the same semantics as in `trio`. - in `wait()` portals which are paired with `run_in_actor()` spawned subactors (versus `start_actor()`) are waited on separately and if the nursery **hasn't** been cancelled but there are errors those are raised immediately before waiting on `start_actor()` subactors which will block indefinitely if they haven't been explicitly cancelled. - if `wait()` does raise when the nursery hasn't yet been cancelled it's expected that it will be called again depending on the actor supervision strategy (i.e. right now we operate with a one-cancels-all strategy, the same as `trio`, so `ActorNursery.__aexit__() calls `cancel()` if any error is raised by `wait()`). Oh and I added `is_main_process()` helper; can't remember why.. --- tractor/_trionics.py | 182 ++++++++++++++++++++++++++++--------------- 1 file changed, 121 insertions(+), 61 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index dc607c7..a6c4b73 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -32,7 +32,8 @@ class ActorNursery: Tuple[str, str], Tuple[Actor, mp.Process, Optional[Portal]] ] = {} - # portals spawned with ``run_in_actor()`` + # portals spawned with ``run_in_actor()`` are + # cancelled when their "main" result arrives self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False self._forkserver: forkserver.ForkServer = None @@ -132,6 +133,8 @@ class ActorNursery: bind_addr=bind_addr, statespace=statespace, ) + # this marks the actor to be cancelled after its portal result + # is retreived, see ``wait()`` below. self._cancel_after_result_on_exit.add(portal) await portal._submit_for_result( mod_path, @@ -142,29 +145,65 @@ class ActorNursery: async def wait(self) -> None: """Wait for all subactors to complete. + + This is probably the most complicated (and confusing, sorry) + function that does all the clever crap to deal with cancellation, + error propagation, and graceful subprocess tear down. """ - async def maybe_consume_result(portal, actor): - if ( - portal in self._cancel_after_result_on_exit and - (portal._result is None and portal._exc is None) - ): - log.debug(f"Waiting on final result from {subactor.uid}") - res = await portal.result() - # if it's an async-gen then we should alert the user - # that we're cancelling it + async def exhaust_portal(portal, actor): + """Pull final result from portal (assuming it was one). + + If the main task is an async generator do our best to consume + what's left of it. + """ + try: + log.debug(f"Waiting on final result from {actor.uid}") + final = res = await portal.result() + # if it's an async-gen then alert that we're cancelling it if inspect.isasyncgen(res): + final = [] log.warning( f"Blindly consuming asyncgen for {actor.uid}") with trio.fail_after(1): async with aclosing(res) as agen: async for item in agen: log.debug(f"Consuming item {item}") + final.append(item) + except Exception as err: + # we reraise in the parent task via a ``trio.MultiError`` + return err + else: + return final + + async def cancel_on_completion( + portal: Portal, + actor: Actor, + task_status=trio.TASK_STATUS_IGNORED, + ) -> None: + """Cancel actor gracefully once it's "main" portal's + result arrives. + + Should only be called for actors spawned with `run_in_actor()`. + """ + with trio.open_cancel_scope() as cs: + task_status.started(cs) + # this may error in which case we expect the far end + # actor to have already terminated itself + result = await exhaust_portal(portal, actor) + if isinstance(result, Exception): + errors.append(result) + log.info(f"Cancelling {portal.channel.uid} gracefully") + await portal.cancel_actor() + + if cs.cancelled_caught: + log.warning( + "Result waiter was cancelled, process may have died") async def wait_for_proc( proc: mp.Process, actor: Actor, portal: Portal, - cancel_scope: trio._core._run.CancelScope, + cancel_scope: Optional[trio._core._run.CancelScope] = None, ) -> None: # TODO: timeout block here? if proc.is_alive(): @@ -172,42 +211,57 @@ class ActorNursery: # please god don't hang proc.join() log.debug(f"Joined {proc}") - await maybe_consume_result(portal, actor) - self._children.pop(actor.uid) - # proc terminated, cancel result waiter + + # proc terminated, cancel result waiter that may have + # been spawned in tandem if cancel_scope: log.warning( f"Cancelling existing result waiter task for {actor.uid}") cancel_scope.cancel() - async def wait_for_actor( - portal: Portal, - actor: Actor, - task_status=trio.TASK_STATUS_IGNORED, - ) -> None: - # cancel the actor gracefully - with trio.open_cancel_scope() as cs: - task_status.started(cs) - await maybe_consume_result(portal, actor) - log.info(f"Cancelling {portal.channel.uid} gracefully") - await portal.cancel_actor() - - if cs.cancelled_caught: - log.warning("Result waiter was cancelled") - - # unblocks when all waiter tasks have completed + log.debug(f"Waiting on all subactors to complete") children = self._children.copy() + errors = [] + # wait on run_in_actor() tasks, unblocks when all complete async with trio.open_nursery() as nursery: for subactor, proc, portal in children.values(): cs = None + # portal from ``run_in_actor()`` if portal in self._cancel_after_result_on_exit: - cs = await nursery.start(wait_for_actor, portal, subactor) + cs = await nursery.start( + cancel_on_completion, portal, subactor) + # TODO: how do we handle remote host spawned actors? + nursery.start_soon( + wait_for_proc, proc, subactor, portal, cs) + + if errors: + if not self.cancelled: + # halt here and expect to be called again once the nursery + # has been cancelled externally (ex. from within __aexit__() + # if an error is captured from ``wait()`` then ``cancel()`` + # is called immediately after which in turn calls ``wait()`` + # again.) + raise trio.MultiError(errors) + + # wait on all `start_actor()` subactors to complete + # if errors were captured above and we have not been cancelled + # then these ``start_actor()`` spawned actors will block until + # cancelled externally + children = self._children.copy() + async with trio.open_nursery() as nursery: + for subactor, proc, portal in children.values(): + # TODO: how do we handle remote host spawned actors? nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) + log.debug(f"All subactors for {self} have terminated") + if errors: + # always raise any error if we're also cancelled + raise trio.MultiError(errors) + async def cancel(self, hard_kill: bool = False) -> None: """Cancel this nursery by instructing each subactor to cancel - iteslf and wait for all subprocesses to terminate. + itself and wait for all subactors to terminate. If ``hard_killl`` is set to ``True`` then kill the processes directly without any far end graceful ``trio`` cancellation. @@ -234,56 +288,57 @@ class ActorNursery: # channel/portal should now be up _, _, portal = self._children[subactor.uid] if portal is None: - # cancelled while waiting on the event? + # cancelled while waiting on the event + # to arrive chan = self._actor._peers[subactor.uid][-1] if chan: portal = Portal(chan) else: # there's no other choice left do_hard_kill(proc) - # spawn cancel tasks async + # spawn cancel tasks assert portal n.start_soon(portal.cancel_actor) - log.debug(f"Waiting on all subactors to complete") - await self.wait() + # mark ourselves as having (tried to have) cancelled all subactors self.cancelled = True - log.debug(f"All subactors for {self} have terminated") + await self.wait() async def __aexit__(self, etype, value, tb): """Wait on all subactor's main routines to complete. """ - try: - if etype is not None: + if etype is not None: + try: # XXX: hypothetically an error could be raised and then - # a cancel signal shows up slightly after in which case the - # else block here might not complete? Should both be shielded? + # a cancel signal shows up slightly after in which case + # the `else:` block here might not complete? + # For now, shield both. with trio.open_cancel_scope(shield=True): if etype is trio.Cancelled: log.warning( - f"{current_actor().uid} was cancelled with {etype}" - ", cancelling actor nursery") - await self.cancel() + f"Nursery for {current_actor().uid} was " + f"cancelled with {etype}") else: log.exception( - f"{current_actor().uid} errored with {etype}, " - "cancelling actor nursery") - await self.cancel() - else: - # XXX: this is effectively the lone cancellation/supervisor - # strategy which exactly mimicks trio's behaviour - log.debug(f"Waiting on subactors {self._children} to complete") - try: - await self.wait() - except Exception as err: - log.warning(f"Nursery caught {err}, cancelling") + f"Nursery for {current_actor().uid} " + f"errored with {etype}, ") await self.cancel() - raise - log.debug(f"Nursery teardown complete") - except Exception: - log.exception("Error on nursery exit:") - await self.wait() - raise + except trio.MultiError as merr: + if value not in merr.exceptions: + raise trio.MultiError(merr.exceptions + [value]) + raise + else: + # XXX: this is effectively the (for now) lone + # cancellation/supervisor strategy which exactly + # mimicks trio's behaviour + log.debug(f"Waiting on subactors {self._children} to complete") + try: + await self.wait() + except Exception as err: + log.warning(f"Nursery caught {err}, cancelling") + await self.cancel() + raise + log.debug(f"Nursery teardown complete") @asynccontextmanager @@ -297,3 +352,8 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # TODO: figure out supervisors from erlang async with ActorNursery(current_actor()) as nursery: yield nursery + + +def is_main_process(): + "Bool determining if this actor is running in the top-most process." + return mp.current_process().name == 'MainProcess' From 9bb8a062ebb0108f5400135841ef7eb6d638badc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Nov 2018 08:47:42 -0500 Subject: [PATCH 04/12] mypy fixes --- tractor/_actor.py | 4 ++-- tractor/_portal.py | 2 +- tractor/_trionics.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 671405d..ab3e434 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -357,10 +357,10 @@ class Actor: # (i.e. no cid was provided in the msg - see above). # Push this error to all local channel consumers # (normally portals) by marking the channel as errored - tb_str = msg.pop('tb_str') + tb_str = msg.get('tb_str') assert chan.uid exc = InternalActorError( - f"{self.channel.uid}\n" + tb_str, + f"{chan.uid}\n" + tb_str, **msg, ) chan._exc = exc diff --git a/tractor/_portal.py b/tractor/_portal.py index 83b8e13..28e28f4 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -60,7 +60,7 @@ class Portal: # when this is set to a tuple returned from ``_submit()`` then # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime - self._result = None + self._result: Optional[Any] = None # set when _submit_for_result is called self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] diff --git a/tractor/_trionics.py b/tractor/_trionics.py index a6c4b73..fb97468 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -222,7 +222,7 @@ class ActorNursery: log.debug(f"Waiting on all subactors to complete") children = self._children.copy() - errors = [] + errors: List[Exception] = [] # wait on run_in_actor() tasks, unblocks when all complete async with trio.open_nursery() as nursery: for subactor, proc, portal in children.values(): From 1bb37dbddfc2a7e912b5947b7800a2ea4583bfa2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Nov 2018 14:15:28 -0500 Subject: [PATCH 05/12] Expose trio.MultiError publicly --- tractor/__init__.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 61e3645..0667bd5 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,10 +4,11 @@ tractor: An actor model micro-framework built on """ import importlib from functools import partial -from typing import Tuple, Any, Optional +from typing import Tuple, Any import typing import trio # type: ignore +from trio import MultiError from .log import get_console_log, get_logger, get_loglevel from ._ipc import _connect_chan, Channel @@ -23,10 +24,11 @@ __all__ = [ 'current_actor', 'find_actor', 'get_arbiter', - 'wait_for_actor', 'open_nursery', - 'RemoteActorError', + 'wait_for_actor', 'Channel', + 'MultiError', + 'RemoteActorError', ] From 82fcf025cc69ed2051cd83194ec2a71a107ca4df Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Nov 2018 14:16:09 -0500 Subject: [PATCH 06/12] Fix: MultiError isn't an Exception... --- tractor/_trionics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index fb97468..2976edf 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -334,7 +334,7 @@ class ActorNursery: log.debug(f"Waiting on subactors {self._children} to complete") try: await self.wait() - except Exception as err: + except (Exception, trio.MultiError) as err: log.warning(f"Nursery caught {err}, cancelling") await self.cancel() raise From 9102c48810906b70c663e7eb2fb90b75676e866c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Nov 2018 14:16:42 -0500 Subject: [PATCH 07/12] Add multierror cancellation tests --- tests/test_cancellation.py | 101 +++++++++++++++++++++++++++++-------- 1 file changed, 80 insertions(+), 21 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 01b3af2..0dd33c2 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -14,26 +14,65 @@ async def assert_err(): assert 0 -def test_remote_error(arb_addr): - """Verify an error raises in a subactor is propagated to the parent. +@pytest.mark.parametrize( + 'args_err', + [ + # expected to be thrown in assert_err + ({}, AssertionError), + # argument mismatch raised in _invoke() + ({'unexpected': 10}, TypeError) + ], + ids=['no_args', 'unexpected_args'], +) +def test_remote_error(arb_addr, args_err): + """Verify an error raised in a subactor that is propagated + to the parent nursery, contains underlying builtin erorr type + infot and causes cancellation and reraising. + """ + args, errtype = args_err + + async def main(): + async with tractor.open_nursery() as nursery: + + portal = await nursery.run_in_actor('errorer', assert_err, **args) + + # get result(s) from main task + try: + await portal.result() + except tractor.RemoteActorError as err: + assert err.type == errtype + print("Look Maa that actor failed hard, hehh") + raise + else: + assert 0, "Remote error was not raised?" + + with pytest.raises(tractor.RemoteActorError): + # also raises + tractor.run(main, arbiter_addr=arb_addr) + + +def test_multierror(arb_addr): + """Verify we raise a ``trio.MultiError`` out of a nursery where + more then one actor errors. """ async def main(): async with tractor.open_nursery() as nursery: - portal = await nursery.run_in_actor('errorer', assert_err) + await nursery.run_in_actor('errorer1', assert_err) + portal2 = await nursery.run_in_actor('errorer2', assert_err) # get result(s) from main task try: - return await portal.result() - except tractor.RemoteActorError: - print("Look Maa that actor failed hard, hehh") + await portal2.result() + except tractor.RemoteActorError as err: + assert err.type == AssertionError + print("Look Maa that first actor failed hard, hehh") raise - except Exception: - pass - assert 0, "Remote error was not raised?" - with pytest.raises(tractor.RemoteActorError): - # also raises + # here we should get a `trio.MultiError` containing exceptions + # from both subactors + + with pytest.raises(trio.MultiError): tractor.run(main, arbiter_addr=arb_addr) @@ -42,9 +81,12 @@ def do_nothing(): def test_cancel_single_subactor(arb_addr): - - async def main(): - + """Ensure a ``ActorNursery.start_actor()`` spawned subactor + cancels when the nursery is cancelled. + """ + async def spawn_actor(): + """Spawn an actor that blocks indefinitely. + """ async with tractor.open_nursery() as nursery: portal = await nursery.start_actor( @@ -55,7 +97,7 @@ def test_cancel_single_subactor(arb_addr): # would hang otherwise await nursery.cancel() - tractor.run(main, arbiter_addr=arb_addr) + tractor.run(spawn_actor, arbiter_addr=arb_addr) async def stream_forever(): @@ -87,13 +129,22 @@ async def test_cancel_infinite_streamer(): assert n.cancelled +@pytest.mark.parametrize( + 'num_actors_and_errs', + [ + (1, tractor.RemoteActorError, AssertionError), + (2, tractor.MultiError, AssertionError) + ], + ids=['one_actor', 'two_actors'], +) @tractor_test -async def test_one_cancels_all(): +async def test_some_cancels_all(num_actors_and_errs): """Verify one failed actor causes all others in the nursery to be cancelled just like in trio. This is the first and only supervisory strategy at the moment. """ + num, first_err, err_type = num_actors_and_errs try: async with tractor.open_nursery() as n: real_actors = [] @@ -103,13 +154,21 @@ async def test_one_cancels_all(): rpc_module_paths=[__name__], )) - # start one actor that will fail immediately - await n.run_in_actor('extra', assert_err) + for i in range(num): + # start one actor that will fail immediately + await n.run_in_actor(f'extra_{i}', assert_err) - # should error here with a ``RemoteActorError`` containing - # an ``AssertionError` + # should error here with a ``RemoteActorError`` or + # ``MultiError`` containing an ``AssertionError` + + except first_err as err: + if isinstance(err, trio.MultiError): + assert len(err.exceptions) == num + for exc in err.exceptions: + assert exc.type == err_type + else: + assert err.type == err_type - except tractor.RemoteActorError: assert n.cancelled is True assert not n._children else: From 7a2b7d9d1469b39326f4cb7c10c98ed7a41e6780 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Nov 2018 14:17:01 -0500 Subject: [PATCH 08/12] Delay tweak for py3.7.1 --- tests/test_multi_program.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index b85ee98..1a71d6b 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -41,7 +41,7 @@ def daemon(loglevel, testdir, arb_addr): stderr=subprocess.PIPE, ) assert not proc.returncode - wait = 0.6 if sys.version_info < (3, 7) else 0.2 + wait = 0.6 if sys.version_info < (3, 7) else 0.4 time.sleep(wait) yield proc sig_prog(proc, signal.SIGINT) From 0a240187c675c69043056c71d49a210e611bbe48 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Nov 2018 16:52:55 -0500 Subject: [PATCH 09/12] Log the exception when unable to ship back rpc errors --- tractor/_actor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index ab3e434..1e7d173 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -125,8 +125,11 @@ async def _invoke( log.exception("Actor errored:") err_msg = pack_error(err) err_msg['cid'] = cid - await chan.send(err_msg) - + try: + await chan.send(err_msg) + except trio.ClosedResourceError: + log.exception( + f"Failed to ship error to caller @ {chan.uid}") if cs is None: # error is from above code not from rpc invocation task_status.started(err) From 3e74cc6f113baad63e0dbd03ef6d2c43abfc7d89 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Nov 2018 16:53:21 -0500 Subject: [PATCH 10/12] Verify nested boxed errors --- tests/test_cancellation.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 0dd33c2..98dd8ee 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -162,11 +162,14 @@ async def test_some_cancels_all(num_actors_and_errs): # ``MultiError`` containing an ``AssertionError` except first_err as err: - if isinstance(err, trio.MultiError): + if isinstance(err, tractor.MultiError): assert len(err.exceptions) == num for exc in err.exceptions: - assert exc.type == err_type - else: + if isinstance(exc, tractor.RemoteActorError): + assert exc.type == err_type + else: + assert isinstance(exc, trio.Cancelled) + elif isinstance(err, tractor.RemoteActorError): assert err.type == err_type assert n.cancelled is True From a482681f9c3c08dbc460c3377c018046181b750a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 22 Nov 2018 11:43:04 -0500 Subject: [PATCH 11/12] Leverage `pytest.raises()` better; fix a bunch of docs --- tests/test_cancellation.py | 22 +++++++++++----------- tractor/_trionics.py | 14 +++++++++----- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 98dd8ee..a6e2813 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -26,8 +26,9 @@ async def assert_err(): ) def test_remote_error(arb_addr, args_err): """Verify an error raised in a subactor that is propagated - to the parent nursery, contains underlying builtin erorr type - infot and causes cancellation and reraising. + to the parent nursery, contains the underlying boxed builtin + error type info and causes cancellation and reraising all the + way up the stack. """ args, errtype = args_err @@ -43,13 +44,13 @@ def test_remote_error(arb_addr, args_err): assert err.type == errtype print("Look Maa that actor failed hard, hehh") raise - else: - assert 0, "Remote error was not raised?" - with pytest.raises(tractor.RemoteActorError): - # also raises + with pytest.raises(tractor.RemoteActorError) as excinfo: tractor.run(main, arbiter_addr=arb_addr) + # ensure boxed error is correct + assert excinfo.value.type == errtype + def test_multierror(arb_addr): """Verify we raise a ``trio.MultiError`` out of a nursery where @@ -139,8 +140,8 @@ async def test_cancel_infinite_streamer(): ) @tractor_test async def test_some_cancels_all(num_actors_and_errs): - """Verify one failed actor causes all others in the nursery - to be cancelled just like in trio. + """Verify a subset of failed subactors causes all others in + the nursery to be cancelled just like the strategy in trio. This is the first and only supervisory strategy at the moment. """ @@ -155,11 +156,10 @@ async def test_some_cancels_all(num_actors_and_errs): )) for i in range(num): - # start one actor that will fail immediately + # start actor(s) that will fail immediately await n.run_in_actor(f'extra_{i}', assert_err) - # should error here with a ``RemoteActorError`` or - # ``MultiError`` containing an ``AssertionError` + # should error here with a ``RemoteActorError`` or ``MultiError`` except first_err as err: if isinstance(err, tractor.MultiError): diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 2976edf..289e42b 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -151,7 +151,7 @@ class ActorNursery: error propagation, and graceful subprocess tear down. """ async def exhaust_portal(portal, actor): - """Pull final result from portal (assuming it was one). + """Pull final result from portal (assuming it has one). If the main task is an async generator do our best to consume what's left of it. @@ -169,7 +169,7 @@ class ActorNursery: async for item in agen: log.debug(f"Consuming item {item}") final.append(item) - except Exception as err: + except (Exception, trio.MultiError) as err: # we reraise in the parent task via a ``trio.MultiError`` return err else: @@ -187,8 +187,9 @@ class ActorNursery: """ with trio.open_cancel_scope() as cs: task_status.started(cs) - # this may error in which case we expect the far end - # actor to have already terminated itself + # if this call errors we store the exception for later + # in ``errors`` which will be reraised inside + # a MultiError and we still send out a cancel request result = await exhaust_portal(portal, actor) if isinstance(result, Exception): errors.append(result) @@ -221,6 +222,8 @@ class ActorNursery: cancel_scope.cancel() log.debug(f"Waiting on all subactors to complete") + # since we pop each child subactor on termination, + # iterate a copy children = self._children.copy() errors: List[Exception] = [] # wait on run_in_actor() tasks, unblocks when all complete @@ -355,5 +358,6 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: def is_main_process(): - "Bool determining if this actor is running in the top-most process." + """Bool determining if this actor is running in the top-most process. + """ return mp.current_process().name == 'MainProcess' From 23c7519fecc8d43482bfa6395e58ba6be7b44e6b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 22 Nov 2018 14:25:31 -0500 Subject: [PATCH 12/12] Jeeze 3.7.1 got even faster? --- tests/test_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 31f85a0..5691fbd 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -166,7 +166,7 @@ def test_a_quadruple_example(time_quad_ex): @pytest.mark.parametrize( 'cancel_delay', - list(map(lambda i: i/10, range(2, 8))) + list(map(lambda i: i/10, range(3, 9))) ) def test_not_fast_enough_quad(arb_addr, time_quad_ex, cancel_delay): """Verify we can cancel midway through the quad example and all actors