From cf48fdecfeb76cdf67a3f779d98c086e75659dd6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 9 Apr 2024 08:44:06 -0400 Subject: [PATCH] Unify `MsgTypeError` as a `RemoteActorError` subtype Since in the receive-side error case the source of the exception is the sender side (normally causing a local `TypeError` at decode time), might as well bundle the error in remote-capture-style using boxing semantics around the causing local type error raised from the `msgspec.msgpack.Decoder.decode()` and with a traceback packed from `msgspec`-specific knowledge of any field-type spec matching failure. Deats on new `MsgTypeError` interface: - includes a `.msg_dict` to get access to any `Decoder.type`-applied load of the original (underlying and offending) IPC msg into a `dict` form using a vanilla decoder which is normally packed into the instance as a `._msg_dict`. - a public getter to the "supposed offending msg" via `.payload_msg` which attempts to take the above `.msg_dict` and load it manually into the corresponding `.msg.types.MsgType` struct. - a constructor `.from_decode()` to make it simple to build out error instances from a failed decode scope where the aforementioned `msgdict: dict` from the vanilla decode can be provided directly. - ALSO, we now pack into `MsgTypeError` directly just like ctxc in `unpack_error()` This also completes the while-standing todo for `RemoteActorError` to contain a ref to the underlying `Error` msg as `._ipc_msg` with public `@property` access that `defstruct()`-creates a pretty struct version via `.ipc_msg`. Internal tweaks for this include: - `._ipc_msg` is the internal literal `Error`-msg instance if provided with `.ipc_msg` the dynamic wrapper as mentioned above. - `.__init__()` now can still take variable `**extra_msgdata` (similar to the `dict`-msgdata as before) to maintain support for subtypes which are constructed manually (not only by `pack_error()`) and insert their own attrs which get placed in a `._extra_msgdata: dict` if no `ipc_msg: Error` is provided as input. - the `.msgdata` is now a merge of any `._extra_msgdata` and a `dict`-casted form of any `._ipc_msg`. - adjust all previous `.msgdata` field lookups to try equivalent field reads on `._ipc_msg: Error`. - drop default single ws indent from `.tb_str` and do a failover lookup to `.msgdata` when `._ipc_msg is None` for the manually constructed subtype-instance case. - add a new class attr `.extra_body_fields: list[str]` to allow subtypes to declare attrs they want shown in the `.__repr__()` output, eg. `ContextCancelled.canceller`, `StreamOverrun.sender` and `MsgTypeError.payload_msg`. - ^-rework defaults pertaining to-^ with rename from `_msgdata_keys` -> `_ipcmsg_keys` with latter now just loading directly from the `Error` fields def and `_body_fields: list[str]` just taking that value and removing the not-so-useful-in-REPL or already shown (i.e. `.tb_str: str`) field names. - add a new mod level `.pack_from_raise()` helper for auto-boxing RAE subtypes constructed manually into `Error`s which is normally how `StreamOverrun` and `MsgTypeError` get created in the runtime. - in support of the above expose a `src_uid: tuple` override to `pack_error()` such that the runtime can provide any remote actor id when packing a locally-created yet remotely-caused RAE subtype. - adjust all typing to expect `Error`s over `dict`-msgs. Adjust some tests to match these changes: - context and inter-peer-cancel tests to make their `.msgdata` related checks against the new `.ipc_msg` as well and `.tb_str` directly. - toss in an extra sleep to `sleep_a_bit_then_cancel_peer()` to keep the 'canceller' ctx child task cancelled by it's parent in the 'root' for the rte-raised-during-ctxc-handling case (apparently now it's returning too fast, cool?). --- tests/test_context_stream_semantics.py | 7 +- tests/test_inter_peer_cancellation.py | 10 + tractor/_exceptions.py | 418 +++++++++++++++++++------ 3 files changed, 333 insertions(+), 102 deletions(-) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index 06a7f8c..5df133d 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -796,10 +796,12 @@ async def test_callee_cancels_before_started( # raises a special cancel signal except tractor.ContextCancelled as ce: + _ce = ce # for debug on crash ce.boxed_type == trio.Cancelled # the traceback should be informative - assert 'itself' in ce.msgdata['tb_str'] + assert 'itself' in ce.tb_str + assert ce.tb_str == ce.msgdata['tb_str'] # teardown the actor await portal.cancel_actor() @@ -1157,7 +1159,8 @@ def test_maybe_allow_overruns_stream( elif slow_side == 'parent': assert err.boxed_type == tractor.RemoteActorError - assert 'StreamOverrun' in err.msgdata['tb_str'] + assert 'StreamOverrun' in err.tb_str + assert err.tb_str == err.msgdata['tb_str'] else: # if this hits the logic blocks from above are not diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 470287f..aa05e3c 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -185,6 +185,10 @@ async def sleep_a_bit_then_cancel_peer( await trio.sleep(cancel_after) await peer.cancel_actor() + # such that we're cancelled by our rent ctx-task + await trio.sleep(3) + print('CANCELLER RETURNING!') + @tractor.context async def stream_ints( @@ -245,6 +249,12 @@ async def stream_from_peer( assert peer_ctx._remote_error is ctxerr assert peer_ctx._remote_error.msgdata == ctxerr.msgdata + # XXX YES, bc exact same msg instances + assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg + + # XXX NO, bc new one always created for property accesss + assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg + # the peer ctx is the canceller even though it's canceller # is the "canceller" XD assert peer_name in peer_ctx.canceller diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 28c6162..a31aa11 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -31,7 +31,10 @@ import textwrap import traceback import trio -from msgspec import structs +from msgspec import ( + structs, + defstruct, +) from tractor._state import current_actor from tractor.log import get_logger @@ -40,6 +43,8 @@ from tractor.msg import ( Msg, Stop, Yield, + pretty_struct, + types as msgtypes, ) if TYPE_CHECKING: @@ -64,21 +69,38 @@ class InternalError(RuntimeError): ''' -_body_fields: list[str] = [ - 'boxed_type', - 'src_type', - # TODO: format this better if we're going to include it. - # 'relay_path', - 'src_uid', - # only in sub-types - 'canceller', - 'sender', +# NOTE: more or less should be close to these: +# 'boxed_type', +# 'src_type', +# 'src_uid', +# 'canceller', +# 'sender', +# TODO: format this better if we're going to include it. +# 'relay_path', +# +_ipcmsg_keys: list[str] = [ + fi.name + for fi, k, v + in pretty_struct.iter_fields(Error) + ] -_msgdata_keys: list[str] = [ - 'boxed_type_str', -] + _body_fields +_body_fields: list[str] = list( + set(_ipcmsg_keys) + + # NOTE: don't show fields that either don't provide + # any extra useful info or that are already shown + # as part of `.__repr__()` output. + - { + 'src_type_str', + 'boxed_type_str', + 'tb_str', + 'relay_path', + '_msg_dict', + 'cid', + } +) def get_err_type(type_name: str) -> BaseException|None: @@ -137,7 +159,7 @@ def pformat_boxed_tb( f'|\n' f' ------ - ------\n\n' f'{tb_str}\n' - f' ------ - ------\n' + f' ------ - ------\n' f'_|\n' ) if len(indent): @@ -152,10 +174,40 @@ def pformat_boxed_tb( + body ) - # return body -# TODO: rename to just `RemoteError`? +def pack_from_raise( + local_err: ( + ContextCancelled + |StreamOverrun + |MsgTypeError + ), + cid: str, + + **rae_fields, + +) -> Error: + ''' + Raise the provided `RemoteActorError` subtype exception + instance locally to get a traceback and pack it into an IPC + `Error`-msg using `pack_error()` to extract the tb info. + + ''' + try: + raise local_err + except type(local_err) as local_err: + err_msg: dict[str, dict] = pack_error( + local_err, + cid=cid, + **rae_fields, + ) + return err_msg + + +# TODO: better compat with IPC msg structs? +# -[ ] rename to just `RemoteError` like in `mp.manager`? +# -[ ] make a `Struct`-subtype by using the .__post_init__()`? +# https://jcristharif.com/msgspec/structs.html#post-init-processing class RemoteActorError(Exception): ''' A box(ing) type which bundles a remote actor `BaseException` for @@ -170,12 +222,28 @@ class RemoteActorError(Exception): 'src_uid', # 'relay_path', ] + extra_body_fields: list[str] = [ + 'cid', + 'boxed_type', + ] def __init__( self, message: str, + ipc_msg: Error|None = None, boxed_type: Type[BaseException]|None = None, - **msgdata + + # NOTE: only provided by subtypes (ctxc and overruns) + # wishing to both manually instantiate and add field + # values defined on `Error` without having to construct an + # `Error()` before the exception is processed by + # `pack_error()`. + # + # TODO: a better way to support this without the extra + # private `._extra_msgdata`? + # -[ ] ctxc constructed inside `._rpc._invoke()` L:638 + # -[ ] overrun @ `._context.Context._deliver_msg()` L:1958 + **extra_msgdata, ) -> None: super().__init__(message) @@ -188,14 +256,24 @@ class RemoteActorError(Exception): # - .remote_type # also pertains to our long long oustanding issue XD # https://github.com/goodboy/tractor/issues/5 - # - # TODO: always set ._boxed_type` as `None` by default - # and instead render if from `.boxed_type_str`? self._boxed_type: BaseException = boxed_type self._src_type: BaseException|None = None + self._ipc_msg: Error|None = ipc_msg - # TODO: make this a `.errmsg: Error` throughout? - self.msgdata: dict[str, Any] = msgdata + if ( + extra_msgdata + and ipc_msg + ): + # XXX mutate the orig msg directly from + # manually provided input params. + for k, v in extra_msgdata.items(): + setattr( + self._ipc_msg, + k, + v, + ) + else: + self._extra_msgdata = extra_msgdata # TODO: mask out eventually or place in `pack_error()` # pre-`return` lines? @@ -214,14 +292,56 @@ class RemoteActorError(Exception): # either by customizing `ContextCancelled.__init__()` or # through a special factor func? elif boxed_type: - if not self.msgdata.get('boxed_type_str'): - self.msgdata['boxed_type_str'] = str( - type(boxed_type).__name__ - ) + boxed_type_str: str = type(boxed_type).__name__ + if ( + ipc_msg + and not self._ipc_msg.boxed_type_str + ): + self._ipc_msg.boxed_type_str = boxed_type_str + assert self.boxed_type_str == self._ipc_msg.boxed_type_str + + else: + self._extra_msgdata['boxed_type_str'] = boxed_type_str - assert self.boxed_type_str == self.msgdata['boxed_type_str'] assert self.boxed_type is boxed_type + @property + def ipc_msg(self) -> pretty_struct.Struct: + ''' + Re-render the underlying `._ipc_msg: Msg` as + a `pretty_struct.Struct` for introspection such that the + returned value is a read-only copy of the original. + + ''' + if self._ipc_msg is None: + return None + + msg_type: Msg = type(self._ipc_msg) + fields: dict[str, Any] = { + k: v for _, k, v in + pretty_struct.iter_fields(self._ipc_msg) + } + return defstruct( + msg_type.__name__, + fields=fields.keys(), + bases=(msg_type, pretty_struct.Struct), + )(**fields) + + @property + def msgdata(self) -> dict[str, Any]: + ''' + The (remote) error data provided by a merge of the + `._ipc_msg: Error` msg and any input `._extra_msgdata: dict` + (provided by subtypes via `.__init__()`). + + ''' + msgdata: dict = ( + structs.asdict(self._ipc_msg) + if self._ipc_msg + else {} + ) + return self._extra_msgdata | msgdata + @property def src_type_str(self) -> str: ''' @@ -231,7 +351,7 @@ class RemoteActorError(Exception): at the first relay/hop's receiving actor. ''' - return self.msgdata['src_type_str'] + return self._ipc_msg.src_type_str @property def src_type(self) -> str: @@ -241,7 +361,7 @@ class RemoteActorError(Exception): ''' if self._src_type is None: self._src_type = get_err_type( - self.msgdata['src_type_str'] + self._ipc_msg.src_type_str ) return self._src_type @@ -252,7 +372,7 @@ class RemoteActorError(Exception): String-name of the (last hop's) boxed error type. ''' - return self.msgdata['boxed_type_str'] + return self._ipc_msg.boxed_type_str @property def boxed_type(self) -> str: @@ -262,7 +382,7 @@ class RemoteActorError(Exception): ''' if self._boxed_type is None: self._boxed_type = get_err_type( - self.msgdata['boxed_type_str'] + self._ipc_msg.boxed_type_str ) return self._boxed_type @@ -275,40 +395,44 @@ class RemoteActorError(Exception): actor's hop. NOTE: a `list` field with the same name is expected to be - passed/updated in `.msgdata`. + passed/updated in `.ipc_msg`. ''' - return self.msgdata['relay_path'] + return self._ipc_msg.relay_path @property def relay_uid(self) -> tuple[str, str]|None: return tuple( - self.msgdata['relay_path'][-1] + self._ipc_msg.relay_path[-1] ) @property def src_uid(self) -> tuple[str, str]|None: if src_uid := ( - self.msgdata.get('src_uid') + self._ipc_msg.src_uid ): return tuple(src_uid) # TODO: use path lookup instead? # return tuple( - # self.msgdata['relay_path'][0] + # self._ipc_msg.relay_path[0] # ) @property def tb_str( self, - indent: str = ' ', + indent: str = '', ) -> str: - if remote_tb := self.msgdata.get('tb_str'): - return textwrap.indent( - remote_tb, - prefix=indent, - ) + remote_tb: str = '' - return '' + if self._ipc_msg: + remote_tb: str = self._ipc_msg.tb_str + else: + remote_tb = self.msgdata.get('tb_str') + + return textwrap.indent( + remote_tb or '', + prefix=indent, + ) def _mk_fields_str( self, @@ -320,14 +444,17 @@ class RemoteActorError(Exception): val: Any|None = ( getattr(self, key, None) or - self.msgdata.get(key) + getattr( + self._ipc_msg, + key, + None, + ) ) # TODO: for `.relay_path` on multiline? # if not isinstance(val, str): # val_str = pformat(val) # else: val_str: str = repr(val) - if val: _repr += f'{key}={val_str}{end_char}' @@ -358,7 +485,9 @@ class RemoteActorError(Exception): ''' fields: str = self._mk_fields_str( - _body_fields, + _body_fields + + + self.extra_body_fields, ) body: str = pformat_boxed_tb( tb_str=self.tb_str, @@ -415,15 +544,6 @@ class RemoteActorError(Exception): # raise NotImplementedError -class InternalActorError(RemoteActorError): - ''' - (Remote) internal `tractor` error indicating failure of some - primitive, machinery state or lowlevel task that should never - occur. - - ''' - - class ContextCancelled(RemoteActorError): ''' Inter-actor task context was cancelled by either a call to @@ -433,6 +553,10 @@ class ContextCancelled(RemoteActorError): reprol_fields: list[str] = [ 'canceller', ] + extra_body_fields: list[str] = [ + 'cid', + 'canceller', + ] @property def canceller(self) -> tuple[str, str]|None: ''' @@ -454,7 +578,7 @@ class ContextCancelled(RemoteActorError): |_`._cancel_task()` ''' - value = self.msgdata.get('canceller') + value: tuple[str, str]|None = self._ipc_msg.canceller if value: return tuple(value) @@ -468,6 +592,132 @@ class ContextCancelled(RemoteActorError): # src_actor_uid = canceller +class MsgTypeError( + RemoteActorError, +): + ''' + Equivalent of a runtime `TypeError` for IPC dialogs. + + Raise when any IPC wire-message is decoded to have invalid + field values (due to type) or for other `MsgCodec` related + violations such as having no extension-type for a field with + a custom type but no `enc/dec_hook()` support. + + Can be raised on the send or recv side of an IPC `Channel` + depending on the particular msg. + + Msgs which cause this to be raised on the `.send()` side (aka + in the "ctl" dialog phase) include: + - `Start` + - `Started` + - `Return` + + Those which cause it on on the `.recv()` side (aka the "nasty + streaming" dialog phase) are: + - `Yield` + - TODO: any embedded `.pld` type defined by user code? + + Normally the source of an error is re-raised from some `.msg._codec` + decode which itself raises in a backend interchange + lib (eg. a `msgspec.ValidationError`). + + ''' + reprol_fields: list[str] = [ + 'ipc_msg', + ] + extra_body_fields: list[str] = [ + 'cid', + 'payload_msg', + ] + + @property + def msg_dict(self) -> dict[str, Any]: + ''' + If the underlying IPC `Msg` was received from a remote + actor but was unable to be decoded to a native + `Yield`|`Started`|`Return` struct, the interchange backend + native format decoder can be used to stash a `dict` + version for introspection by the invalidating RPC task. + + ''' + return self.msgdata.get('_msg_dict') + + @property + def payload_msg(self) -> Msg|None: + ''' + Attempt to construct what would have been the original + `Msg`-with-payload subtype (i.e. an instance from the set + of msgs in `.msg.types._payload_msgs`) which failed + validation. + + ''' + msg_dict: dict = self.msg_dict.copy() + name: str = msg_dict.pop('msg_type') + msg_type: Msg = getattr( + msgtypes, + name, + Msg, + ) + return msg_type(**msg_dict) + + @property + def cid(self) -> str: + # pre-packed using `.from_decode()` constructor + return self.msgdata.get('cid') + + @classmethod + def from_decode( + cls, + message: str, + msgdict: dict, + + ) -> MsgTypeError: + return cls( + message=message, + + # NOTE: original "vanilla decode" of the msg-bytes + # is placed inside a value readable from + # `.msgdata['_msg_dict']` + _msg_dict=msgdict, + + # expand and pack all RAE compat fields + # into the `._extra_msgdata` aux `dict`. + **{ + k: v + for k, v in msgdict.items() + if k in _ipcmsg_keys + }, + ) + + +class StreamOverrun( + RemoteActorError, + trio.TooSlowError, +): + reprol_fields: list[str] = [ + 'sender', + ] + ''' + This stream was overrun by its sender and can be optionally + handled by app code using `MsgStream.send()/.receive()`. + + ''' + @property + def sender(self) -> tuple[str, str] | None: + value = self._ipc_msg.sender + if value: + return tuple(value) + + +# class InternalActorError(RemoteActorError): +# ''' +# Boxed (Remote) internal `tractor` error indicating failure of some +# primitive, machinery state or lowlevel task that should never +# occur. + +# ''' + + class TransportClosed(trio.ClosedResourceError): "Underlying channel transport was closed prior to use" @@ -484,23 +734,6 @@ class NoRuntime(RuntimeError): "The root actor has not been initialized yet" -class StreamOverrun( - RemoteActorError, - trio.TooSlowError, -): - reprol_fields: list[str] = [ - 'sender', - ] - ''' - This stream was overrun by sender - - ''' - @property - def sender(self) -> tuple[str, str] | None: - value = self.msgdata.get('sender') - if value: - return tuple(value) - class AsyncioCancelled(Exception): ''' @@ -518,23 +751,12 @@ class MessagingError(Exception): ''' -class MsgTypeError(MessagingError): - ''' - Equivalent of a `TypeError` for an IPC wire-message - due to an invalid field value (type). - - Normally this is re-raised from some `.msg._codec` - decode error raised by a backend interchange lib - like `msgspec` or `pycapnproto`. - - ''' - - def pack_error( exc: BaseException|RemoteActorError, tb: str|None = None, cid: str|None = None, + src_uid: tuple[str, str]|None = None, ) -> Error: ''' @@ -560,7 +782,8 @@ def pack_error( ): error_msg.update(exc.msgdata) - # an onion/inception we need to pack + # an onion/inception we need to pack as a nested and relayed + # remotely boxed error. if ( type(exc) is RemoteActorError and (boxed := exc.boxed_type) @@ -584,7 +807,7 @@ def pack_error( error_msg['boxed_type_str'] = 'RemoteActorError' else: - error_msg['src_uid'] = our_uid + error_msg['src_uid'] = src_uid or our_uid error_msg['src_type_str'] = type(exc).__name__ error_msg['boxed_type_str'] = type(exc).__name__ @@ -596,7 +819,7 @@ def pack_error( # XXX NOTE: always ensure the traceback-str is from the # locally raised error (**not** the prior relay's boxed - # content's `.msgdata`). + # content's in `._ipc_msg.tb_str`). error_msg['tb_str'] = tb_str if cid is not None: @@ -606,7 +829,7 @@ def pack_error( def unpack_error( - msg: dict[str, Any]|Error, + msg: Error, chan: Channel|None = None, box_type: RemoteActorError = RemoteActorError, @@ -624,16 +847,10 @@ def unpack_error( ''' __tracebackhide__: bool = hide_tb - error_dict: dict[str, dict]|None if not isinstance(msg, Error): - # if ( - # error_dict := msg.get('error') - # ) is None: - # no error field, nothing to unpack. return None - # retrieve the remote error's msg encoded details - # tb_str: str = error_dict.get('tb_str', '') + # retrieve the remote error's encoded details from fields tb_str: str = msg.tb_str message: str = ( f'{chan.uid}\n' @@ -651,6 +868,10 @@ def unpack_error( box_type = ContextCancelled assert boxed_type is box_type + elif boxed_type_str == 'MsgTypeError': + box_type = MsgTypeError + assert boxed_type is box_type + # TODO: already included by `_this_mod` in else loop right? # # we have an inception/onion-error so ensure @@ -661,12 +882,9 @@ def unpack_error( # assert len(error_dict['relay_path']) >= 1 assert len(msg.relay_path) >= 1 - # TODO: mk RAE just take the `Error` instance directly? - error_dict: dict = structs.asdict(msg) - exc = box_type( message, - **error_dict, + ipc_msg=msg, ) return exc