From 3202462cd5396e4767cc27a1ef805885672f86fe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 17 Aug 2018 14:49:17 -0400 Subject: [PATCH] Attach remote internal errors to channels This ensures that internal errors received from a remote actor are indeed raised even in the `MainProcess` **before** comms tasks are cancelled. Internal error in this case means any error packet received on a channel that doesn't have a `cid` header. RPC errors (which **do** have a `cid` header) are still forwarded to the consuming caller as usual. --- tractor/_actor.py | 44 +++++++++++++++++++++----------------------- tractor/_portal.py | 14 +++++++++++--- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 1780824..a5e833f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -187,7 +187,7 @@ class Actor: for path in self.rpc_module_paths: self._mods[path] = importlib.import_module(path) - # XXX: triggers an internal error which causes a hanging + # XXX: triggers an internal error which can cause a hanging # problem (without the recently added .throw()) on teardown # (root nursery tears down thus killing all channels before # sending cancels to subactors during actor nursery teardown @@ -302,19 +302,22 @@ class Actor: log.debug(f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') if cid: - if cid == 'internal': # internal actor error - raise InternalActorError( - f"{chan.uid}\n" + msg['error']) - # deliver response to local caller/waiter await self._push_result(chan.uid, cid, msg) - log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue # process command request - ns, funcname, kwargs, actorid, cid = msg['cmd'] + try: + ns, funcname, kwargs, actorid, cid = msg['cmd'] + except KeyError: + # push any non-rpc-response error to all local consumers + # and mark the channel as errored + chan._exc = err = msg['error'] + for cid in self._actors2calls[chan.uid]: + await self._push_result(chan.uid, cid, msg) + raise InternalActorError(f"{chan.uid}\n" + err) log.debug( f"Processing request from {actorid}\n" @@ -355,23 +358,16 @@ class Actor: else: # channel disconnect log.debug(f"{chan} from {chan.uid} disconnected") - except InternalActorError as err: - # ship internal errors upwards - log.exception("Received internal error:") - if self._parent_chan: - await self._parent_chan.send( - {'error': traceback.format_exc(), 'cid': 'internal'}) - raise - else: - assert self._main_coro - self._main_coro.throw(err) except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") except Exception: - # ship exception (from above code) to peer as an internal error - await chan.send( - {'error': traceback.format_exc(), 'cid': 'internal'}) - raise + # ship exception (from above code) to parent + log.exception("Actor errored:") + if self._parent_chan: + await self._parent_chan.send({'error': traceback.format_exc()}) + raise + # if this is the `MainProcess` we expect the error broadcasting + # above to trigger an error at consuming portal "checkpoints" finally: log.debug(f"Exiting msg loop for {chan} from {chan.uid}") @@ -471,7 +467,8 @@ class Actor: if self._parent_chan: try: await self._parent_chan.send( - {'error': traceback.format_exc(), 'cid': 'internal'}) + # {'error': traceback.format_exc(), 'cid': 'internal'}) + {'error': traceback.format_exc()}) except trio.ClosedResourceError: log.error( f"Failed to ship error to parent " @@ -480,7 +477,8 @@ class Actor: if not registered_with_arbiter: log.exception( - f"Failed to register with arbiter @ {arbiter_addr}") + f"Actor errored and failed to register with arbiter " + f"@ {arbiter_addr}") else: raise finally: diff --git a/tractor/_portal.py b/tractor/_portal.py index 169249a..835f918 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -149,9 +149,16 @@ class Portal: """Return the result(s) from the remote actor's "main" task. """ if self._expect_result is None: - raise RuntimeError( - f"Portal for {self.channel.uid} is not expecting a final" - "result?") + # (remote) errors are slapped on the channel + # teardown can reraise them + exc = self.channel._exc + if exc: + raise RemoteActorError(f"{self.channel.uid}\n" + exc) + else: + raise RuntimeError( + f"Portal for {self.channel.uid} is not expecting a final" + "result?") + elif self._result is None: self._result = await self._return_from_resptype( *self._expect_result @@ -184,6 +191,7 @@ class Portal: log.warn(f"May have failed to cancel {self.channel.uid}") return False + class LocalPortal: """A 'portal' to a local ``Actor``.