Attach remote internal errors to channels

This ensures that internal errors received from a remote actor are
indeed raised even in the `MainProcess` **before** comms tasks are
cancelled. Internal error in this case means any error packet received
on a channel that doesn't have a `cid` header. RPC errors (which **do**
have a `cid` header) are still forwarded to the consuming caller as usual.
wait_for_actor
Tyler Goodlet 2018-08-17 14:49:17 -04:00
parent 901f99bbec
commit 3202462cd5
2 changed files with 32 additions and 26 deletions

View File

@ -187,7 +187,7 @@ class Actor:
for path in self.rpc_module_paths: for path in self.rpc_module_paths:
self._mods[path] = importlib.import_module(path) self._mods[path] = importlib.import_module(path)
# XXX: triggers an internal error which causes a hanging # XXX: triggers an internal error which can cause a hanging
# problem (without the recently added .throw()) on teardown # problem (without the recently added .throw()) on teardown
# (root nursery tears down thus killing all channels before # (root nursery tears down thus killing all channels before
# sending cancels to subactors during actor nursery teardown # sending cancels to subactors during actor nursery teardown
@ -302,19 +302,22 @@ class Actor:
log.debug(f"Received msg {msg} from {chan.uid}") log.debug(f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid') cid = msg.get('cid')
if cid: if cid:
if cid == 'internal': # internal actor error
raise InternalActorError(
f"{chan.uid}\n" + msg['error'])
# deliver response to local caller/waiter # deliver response to local caller/waiter
await self._push_result(chan.uid, cid, msg) await self._push_result(chan.uid, cid, msg)
log.debug( log.debug(
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
continue continue
# process command request # process command request
ns, funcname, kwargs, actorid, cid = msg['cmd'] try:
ns, funcname, kwargs, actorid, cid = msg['cmd']
except KeyError:
# push any non-rpc-response error to all local consumers
# and mark the channel as errored
chan._exc = err = msg['error']
for cid in self._actors2calls[chan.uid]:
await self._push_result(chan.uid, cid, msg)
raise InternalActorError(f"{chan.uid}\n" + err)
log.debug( log.debug(
f"Processing request from {actorid}\n" f"Processing request from {actorid}\n"
@ -355,23 +358,16 @@ class Actor:
else: # channel disconnect else: # channel disconnect
log.debug(f"{chan} from {chan.uid} disconnected") log.debug(f"{chan} from {chan.uid} disconnected")
except InternalActorError as err:
# ship internal errors upwards
log.exception("Received internal error:")
if self._parent_chan:
await self._parent_chan.send(
{'error': traceback.format_exc(), 'cid': 'internal'})
raise
else:
assert self._main_coro
self._main_coro.throw(err)
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke") log.error(f"{chan} form {chan.uid} broke")
except Exception: except Exception:
# ship exception (from above code) to peer as an internal error # ship exception (from above code) to parent
await chan.send( log.exception("Actor errored:")
{'error': traceback.format_exc(), 'cid': 'internal'}) if self._parent_chan:
raise await self._parent_chan.send({'error': traceback.format_exc()})
raise
# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
finally: finally:
log.debug(f"Exiting msg loop for {chan} from {chan.uid}") log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
@ -471,7 +467,8 @@ class Actor:
if self._parent_chan: if self._parent_chan:
try: try:
await self._parent_chan.send( await self._parent_chan.send(
{'error': traceback.format_exc(), 'cid': 'internal'}) # {'error': traceback.format_exc(), 'cid': 'internal'})
{'error': traceback.format_exc()})
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error( log.error(
f"Failed to ship error to parent " f"Failed to ship error to parent "
@ -480,7 +477,8 @@ class Actor:
if not registered_with_arbiter: if not registered_with_arbiter:
log.exception( log.exception(
f"Failed to register with arbiter @ {arbiter_addr}") f"Actor errored and failed to register with arbiter "
f"@ {arbiter_addr}")
else: else:
raise raise
finally: finally:

View File

@ -149,9 +149,16 @@ class Portal:
"""Return the result(s) from the remote actor's "main" task. """Return the result(s) from the remote actor's "main" task.
""" """
if self._expect_result is None: if self._expect_result is None:
raise RuntimeError( # (remote) errors are slapped on the channel
f"Portal for {self.channel.uid} is not expecting a final" # teardown can reraise them
"result?") exc = self.channel._exc
if exc:
raise RemoteActorError(f"{self.channel.uid}\n" + exc)
else:
raise RuntimeError(
f"Portal for {self.channel.uid} is not expecting a final"
"result?")
elif self._result is None: elif self._result is None:
self._result = await self._return_from_resptype( self._result = await self._return_from_resptype(
*self._expect_result *self._expect_result
@ -184,6 +191,7 @@ class Portal:
log.warn(f"May have failed to cancel {self.channel.uid}") log.warn(f"May have failed to cancel {self.channel.uid}")
return False return False
class LocalPortal: class LocalPortal:
"""A 'portal' to a local ``Actor``. """A 'portal' to a local ``Actor``.