Improve error propagation machinery

Use the new custom error types throughout the actor and portal
primitives and set a few new rules:
- internal errors are any error not raised by an rpc task and are
  **not** forwarded to portals but instead are raised directly in
  the msg loop.
- portals always re-raise a "main task" error for every call to
  ``Portal.result()``.
improved_errors
Tyler Goodlet 2018-11-19 04:05:07 -05:00
parent 2f6609ab78
commit e75b25dc21
3 changed files with 96 additions and 58 deletions

View File

@ -16,7 +16,7 @@ from ._actor import (
) )
from ._trionics import open_nursery from ._trionics import open_nursery
from ._state import current_actor from ._state import current_actor
from ._portal import RemoteActorError from ._exceptions import RemoteActorError
__all__ = [ __all__ = [

View File

@ -16,6 +16,7 @@ from async_generator import asynccontextmanager, aclosing
from ._ipc import Channel, _connect_chan from ._ipc import Channel, _connect_chan
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
from ._exceptions import pack_error, InternalActorError
from ._portal import ( from ._portal import (
Portal, Portal,
open_portal, open_portal,
@ -33,10 +34,6 @@ class ActorFailure(Exception):
"General actor failure" "General actor failure"
class InternalActorError(RuntimeError):
"Actor primitive internals failure"
async def _invoke( async def _invoke(
actor: 'Actor', actor: 'Actor',
cid: str, cid: str,
@ -49,6 +46,7 @@ async def _invoke(
""" """
sig = inspect.signature(func) sig = inspect.signature(func)
treat_as_gen = False treat_as_gen = False
cs = None
if 'chan' in sig.parameters: if 'chan' in sig.parameters:
assert 'cid' in sig.parameters, \ assert 'cid' in sig.parameters, \
f"{func} must accept a `cid` (caller id) kwarg" f"{func} must accept a `cid` (caller id) kwarg"
@ -122,10 +120,16 @@ async def _invoke(
with trio.open_cancel_scope() as cs: with trio.open_cancel_scope() as cs:
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except Exception: except Exception as err:
# always ship errors back to caller # always ship errors back to caller
log.exception("Actor errored:") log.exception("Actor errored:")
await chan.send({'error': traceback.format_exc(), 'cid': cid}) err_msg = pack_error(err)
err_msg['cid'] = cid
await chan.send(err_msg)
if cs is None:
# error is from above code not from rpc invocation
task_status.started(err)
finally: finally:
# RPC task bookeeping # RPC task bookeeping
tasks = actor._rpc_tasks.get(chan, None) tasks = actor._rpc_tasks.get(chan, None)
@ -348,13 +352,19 @@ class Actor:
try: try:
ns, funcname, kwargs, actorid, cid = msg['cmd'] ns, funcname, kwargs, actorid, cid = msg['cmd']
except KeyError: except KeyError:
# push any non-rpc-response error to all local consumers # This is the non-rpc error case, that is, an
# and mark the channel as errored # error **not** raised inside a call to ``_invoke()``
chan._exc = err = msg['error'] # (i.e. no cid was provided in the msg - see above).
# Push this error to all local channel consumers
# (normally portals) by marking the channel as errored
tb_str = msg.pop('tb_str')
assert chan.uid assert chan.uid
for cid in self._actors2calls[chan.uid]: exc = InternalActorError(
await self._push_result(chan.uid, cid, msg) f"{self.channel.uid}\n" + tb_str,
raise InternalActorError(f"{chan.uid}\n" + err) **msg,
)
chan._exc = exc
raise exc
log.debug( log.debug(
f"Processing request from {actorid}\n" f"Processing request from {actorid}\n"
@ -373,22 +383,30 @@ class Actor:
# never allow cancelling cancel requests (results in # never allow cancelling cancel requests (results in
# deadlock and other weird behaviour) # deadlock and other weird behaviour)
if func != self.cancel: if func != self.cancel:
self._no_more_rpc_tasks.clear() if isinstance(cs, Exception):
log.info(f"RPC func is {func}") log.warn(f"Task for RPC func {func} failed with {cs}")
self._rpc_tasks.setdefault(chan, []).append((cs, func)) else:
# mark that we have ongoing rpc tasks
self._no_more_rpc_tasks.clear()
log.info(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
self._rpc_tasks.setdefault(chan, []).append((cs, func))
log.debug( log.debug(
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
else: # channel disconnect else:
# channel disconnect
log.debug(f"{chan} from {chan.uid} disconnected") log.debug(f"{chan} from {chan.uid} disconnected")
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke") log.error(f"{chan} form {chan.uid} broke")
except Exception: except Exception as err:
# ship exception (from above code) to parent # ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent
log.exception("Actor errored:") log.exception("Actor errored:")
if self._parent_chan: if self._parent_chan:
await self._parent_chan.send({'error': traceback.format_exc()}) await self._parent_chan.send(pack_error(err))
raise raise
# if this is the `MainProcess` we expect the error broadcasting # if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints" # above to trigger an error at consuming portal "checkpoints"
finally: finally:
@ -480,25 +498,30 @@ class Actor:
# blocks here as expected until the channel server is # blocks here as expected until the channel server is
# killed (i.e. this actor is cancelled or signalled by the parent) # killed (i.e. this actor is cancelled or signalled by the parent)
except Exception: except Exception as err:
if not registered_with_arbiter:
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {arbiter_addr}")
if self._parent_chan: if self._parent_chan:
try: try:
# internal error so ship to parent without cid
await self._parent_chan.send( await self._parent_chan.send(
{'error': traceback.format_exc()}) pack_error(err))
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error( log.error(
f"Failed to ship error to parent " f"Failed to ship error to parent "
f"{self._parent_chan.uid}, channel was closed") f"{self._parent_chan.uid}, channel was closed")
log.exception("Actor errored:") log.exception("Actor errored:")
if not registered_with_arbiter:
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {arbiter_addr}")
else: else:
# XXX wait, why?
# causes a hang if I always raise..
raise raise
finally: finally:
await self._do_unreg(arbiter_addr) if registered_with_arbiter:
await self._do_unreg(arbiter_addr)
# terminate actor once all it's peers (actors that connected # terminate actor once all it's peers (actors that connected
# to it as clients) have disappeared # to it as clients) have disappeared
if not self._no_more_peers.is_set(): if not self._no_more_peers.is_set():

View File

@ -12,15 +12,12 @@ from async_generator import asynccontextmanager
from ._state import current_actor from ._state import current_actor
from ._ipc import Channel from ._ipc import Channel
from .log import get_logger from .log import get_logger
from ._exceptions import unpack_error, NoResult, RemoteActorError
log = get_logger('tractor') log = get_logger('tractor')
class RemoteActorError(RuntimeError):
"Remote actor exception bundled locally"
@asynccontextmanager @asynccontextmanager
async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): async def maybe_open_nursery(nursery: trio._core._run.Nursery = None):
"""Create a new nursery if None provided. """Create a new nursery if None provided.
@ -64,7 +61,7 @@ class Portal:
# it is expected that ``result()`` will be awaited at some point # it is expected that ``result()`` will be awaited at some point
# during the portal's lifetime # during the portal's lifetime
self._result = None self._result = None
self._exc: Optional[RemoteActorError] = None # set when _submit_for_result is called
self._expect_result: Optional[ self._expect_result: Optional[
Tuple[str, Any, str, Dict[str, Any]] Tuple[str, Any, str, Dict[str, Any]]
] = None ] = None
@ -97,8 +94,7 @@ class Portal:
elif functype == 'asyncgen': elif functype == 'asyncgen':
resp_type = 'yield' resp_type = 'yield'
elif 'error' in first_msg: elif 'error' in first_msg:
raise RemoteActorError( raise unpack_error(first_msg, self.channel)
f"{self.channel.uid}\n" + first_msg['error'])
else: else:
raise ValueError(f"{first_msg} is an invalid response packet?") raise ValueError(f"{first_msg} is an invalid response packet?")
@ -110,10 +106,11 @@ class Portal:
self._expect_result = await self._submit(ns, func, **kwargs) self._expect_result = await self._submit(ns, func, **kwargs)
async def run(self, ns: str, func: str, **kwargs) -> Any: async def run(self, ns: str, func: str, **kwargs) -> Any:
"""Submit a function to be scheduled and run by actor, wrap and return """Submit a remote function to be scheduled and run by actor,
its (stream of) result(s). wrap and return its (stream of) result(s).
This is a blocking call. This is a blocking call and returns either a value from the
remote rpc task or a local async generator instance.
""" """
return await self._return_from_resptype( return await self._return_from_resptype(
*(await self._submit(ns, func, **kwargs)) *(await self._submit(ns, func, **kwargs))
@ -137,14 +134,19 @@ class Portal:
if 'stop' in msg: if 'stop' in msg:
break # far end async gen terminated break # far end async gen terminated
else: else:
raise RemoteActorError( # internal error should never get here
f"{self.channel.uid}\n" + msg['error']) assert msg.get('cid'), (
"Received internal error at portal?")
raise unpack_error(msg, self.channel)
except StopAsyncIteration: except StopAsyncIteration:
log.debug( log.debug(
f"Cancelling async gen call {cid} to " f"Cancelling async gen call {cid} to "
f"{self.channel.uid}") f"{self.channel.uid}")
raise raise
# TODO: use AsyncExitStack to aclose() all agens
# on teardown
return yield_from_q() return yield_from_q()
elif resptype == 'return': elif resptype == 'return':
@ -152,30 +154,43 @@ class Portal:
try: try:
return msg['return'] return msg['return']
except KeyError: except KeyError:
self._exc = RemoteActorError( # internal error should never get here
f"{self.channel.uid}\n" + msg['error']) assert msg.get('cid'), "Received internal error at portal?"
raise self._exc raise unpack_error(msg, self.channel)
else: else:
raise ValueError(f"Unknown msg response type: {first_msg}") raise ValueError(f"Unknown msg response type: {first_msg}")
async def result(self) -> Any: async def result(self) -> Any:
"""Return the result(s) from the remote actor's "main" task. """Return the result(s) from the remote actor's "main" task.
""" """
if self._expect_result is None: # Check for non-rpc errors slapped on the
# (remote) errors are slapped on the channel # channel for which we always raise
# teardown can reraise them exc = self.channel._exc
exc = self.channel._exc if exc:
if exc: raise exc
raise RemoteActorError(f"{self.channel.uid}\n{exc}")
else: # not expecting a "main" result
raise RuntimeError( if self._expect_result is None:
f"Portal for {self.channel.uid} is not expecting a final" log.warn(
"result?") f"Portal for {self.channel.uid} not expecting a final"
" result?\nresult() should only be called if subactor"
" was spawned with `ActorNursery.run_in_actor()`")
return NoResult
# expecting a "main" result
assert self._expect_result
if self._result is None:
try:
self._result = await self._return_from_resptype(
*self._expect_result
)
except RemoteActorError as err:
self._result = err
# re-raise error on every call
if isinstance(self._result, RemoteActorError):
raise self._result
elif self._result is None:
self._result = await self._return_from_resptype(
*self._expect_result
)
return self._result return self._result
async def close(self) -> None: async def close(self) -> None: