diff --git a/tractor/__init__.py b/tractor/__init__.py index ca2055f..61e3645 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -16,7 +16,7 @@ from ._actor import ( ) from ._trionics import open_nursery from ._state import current_actor -from ._portal import RemoteActorError +from ._exceptions import RemoteActorError __all__ = [ diff --git a/tractor/_actor.py b/tractor/_actor.py index 8d686d5..671405d 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -16,6 +16,7 @@ from async_generator import asynccontextmanager, aclosing from ._ipc import Channel, _connect_chan from .log import get_console_log, get_logger +from ._exceptions import pack_error, InternalActorError from ._portal import ( Portal, open_portal, @@ -33,10 +34,6 @@ class ActorFailure(Exception): "General actor failure" -class InternalActorError(RuntimeError): - "Actor primitive internals failure" - - async def _invoke( actor: 'Actor', cid: str, @@ -49,6 +46,7 @@ async def _invoke( """ sig = inspect.signature(func) treat_as_gen = False + cs = None if 'chan' in sig.parameters: assert 'cid' in sig.parameters, \ f"{func} must accept a `cid` (caller id) kwarg" @@ -122,10 +120,16 @@ async def _invoke( with trio.open_cancel_scope() as cs: task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) - except Exception: + except Exception as err: # always ship errors back to caller log.exception("Actor errored:") - await chan.send({'error': traceback.format_exc(), 'cid': cid}) + err_msg = pack_error(err) + err_msg['cid'] = cid + await chan.send(err_msg) + + if cs is None: + # error is from above code not from rpc invocation + task_status.started(err) finally: # RPC task bookeeping tasks = actor._rpc_tasks.get(chan, None) @@ -348,13 +352,19 @@ class Actor: try: ns, funcname, kwargs, actorid, cid = msg['cmd'] except KeyError: - # push any non-rpc-response error to all local consumers - # and mark the channel as errored - chan._exc = err = msg['error'] + # This is the non-rpc error case, that is, an + # error **not** raised inside a call to ``_invoke()`` + # (i.e. no cid was provided in the msg - see above). + # Push this error to all local channel consumers + # (normally portals) by marking the channel as errored + tb_str = msg.pop('tb_str') assert chan.uid - for cid in self._actors2calls[chan.uid]: - await self._push_result(chan.uid, cid, msg) - raise InternalActorError(f"{chan.uid}\n" + err) + exc = InternalActorError( + f"{self.channel.uid}\n" + tb_str, + **msg, + ) + chan._exc = exc + raise exc log.debug( f"Processing request from {actorid}\n" @@ -373,22 +383,30 @@ class Actor: # never allow cancelling cancel requests (results in # deadlock and other weird behaviour) if func != self.cancel: - self._no_more_rpc_tasks.clear() - log.info(f"RPC func is {func}") - self._rpc_tasks.setdefault(chan, []).append((cs, func)) + if isinstance(cs, Exception): + log.warn(f"Task for RPC func {func} failed with {cs}") + else: + # mark that we have ongoing rpc tasks + self._no_more_rpc_tasks.clear() + log.info(f"RPC func is {func}") + # store cancel scope such that the rpc task can be + # cancelled gracefully if requested + self._rpc_tasks.setdefault(chan, []).append((cs, func)) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") - else: # channel disconnect + else: + # channel disconnect log.debug(f"{chan} from {chan.uid} disconnected") except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") - except Exception: - # ship exception (from above code) to parent + except Exception as err: + # ship any "internal" exception (i.e. one from internal machinery + # not from an rpc task) to parent log.exception("Actor errored:") if self._parent_chan: - await self._parent_chan.send({'error': traceback.format_exc()}) - raise + await self._parent_chan.send(pack_error(err)) + raise # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" finally: @@ -480,25 +498,30 @@ class Actor: # blocks here as expected until the channel server is # killed (i.e. this actor is cancelled or signalled by the parent) - except Exception: + except Exception as err: + if not registered_with_arbiter: + log.exception( + f"Actor errored and failed to register with arbiter " + f"@ {arbiter_addr}") + if self._parent_chan: try: + # internal error so ship to parent without cid await self._parent_chan.send( - {'error': traceback.format_exc()}) + pack_error(err)) except trio.ClosedResourceError: log.error( f"Failed to ship error to parent " f"{self._parent_chan.uid}, channel was closed") log.exception("Actor errored:") - - if not registered_with_arbiter: - log.exception( - f"Actor errored and failed to register with arbiter " - f"@ {arbiter_addr}") else: + # XXX wait, why? + # causes a hang if I always raise.. raise + finally: - await self._do_unreg(arbiter_addr) + if registered_with_arbiter: + await self._do_unreg(arbiter_addr) # terminate actor once all it's peers (actors that connected # to it as clients) have disappeared if not self._no_more_peers.is_set(): diff --git a/tractor/_portal.py b/tractor/_portal.py index 9dc0cba..83b8e13 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -12,15 +12,12 @@ from async_generator import asynccontextmanager from ._state import current_actor from ._ipc import Channel from .log import get_logger +from ._exceptions import unpack_error, NoResult, RemoteActorError log = get_logger('tractor') -class RemoteActorError(RuntimeError): - "Remote actor exception bundled locally" - - @asynccontextmanager async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): """Create a new nursery if None provided. @@ -64,7 +61,7 @@ class Portal: # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime self._result = None - self._exc: Optional[RemoteActorError] = None + # set when _submit_for_result is called self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None @@ -97,8 +94,7 @@ class Portal: elif functype == 'asyncgen': resp_type = 'yield' elif 'error' in first_msg: - raise RemoteActorError( - f"{self.channel.uid}\n" + first_msg['error']) + raise unpack_error(first_msg, self.channel) else: raise ValueError(f"{first_msg} is an invalid response packet?") @@ -110,10 +106,11 @@ class Portal: self._expect_result = await self._submit(ns, func, **kwargs) async def run(self, ns: str, func: str, **kwargs) -> Any: - """Submit a function to be scheduled and run by actor, wrap and return - its (stream of) result(s). + """Submit a remote function to be scheduled and run by actor, + wrap and return its (stream of) result(s). - This is a blocking call. + This is a blocking call and returns either a value from the + remote rpc task or a local async generator instance. """ return await self._return_from_resptype( *(await self._submit(ns, func, **kwargs)) @@ -137,14 +134,19 @@ class Portal: if 'stop' in msg: break # far end async gen terminated else: - raise RemoteActorError( - f"{self.channel.uid}\n" + msg['error']) + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") + raise unpack_error(msg, self.channel) + except StopAsyncIteration: log.debug( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") raise + # TODO: use AsyncExitStack to aclose() all agens + # on teardown return yield_from_q() elif resptype == 'return': @@ -152,30 +154,43 @@ class Portal: try: return msg['return'] except KeyError: - self._exc = RemoteActorError( - f"{self.channel.uid}\n" + msg['error']) - raise self._exc + # internal error should never get here + assert msg.get('cid'), "Received internal error at portal?" + raise unpack_error(msg, self.channel) else: raise ValueError(f"Unknown msg response type: {first_msg}") async def result(self) -> Any: """Return the result(s) from the remote actor's "main" task. """ - if self._expect_result is None: - # (remote) errors are slapped on the channel - # teardown can reraise them - exc = self.channel._exc - if exc: - raise RemoteActorError(f"{self.channel.uid}\n{exc}") - else: - raise RuntimeError( - f"Portal for {self.channel.uid} is not expecting a final" - "result?") + # Check for non-rpc errors slapped on the + # channel for which we always raise + exc = self.channel._exc + if exc: + raise exc + + # not expecting a "main" result + if self._expect_result is None: + log.warn( + f"Portal for {self.channel.uid} not expecting a final" + " result?\nresult() should only be called if subactor" + " was spawned with `ActorNursery.run_in_actor()`") + return NoResult + + # expecting a "main" result + assert self._expect_result + if self._result is None: + try: + self._result = await self._return_from_resptype( + *self._expect_result + ) + except RemoteActorError as err: + self._result = err + + # re-raise error on every call + if isinstance(self._result, RemoteActorError): + raise self._result - elif self._result is None: - self._result = await self._return_from_resptype( - *self._expect_result - ) return self._result async def close(self) -> None: