Refine `MsgTypeError` handling to relay-up-on-`.recv()`
Such that `Channel.recv()` + `MsgpackTCPStream.recv()` originating
msg-type-errors are not raised at the IPC transport layer but instead
relayed up the runtime stack for eventual handling by user-app code via
the `Context`/`MsgStream` layer APIs.
This design choice leads to a substantial amount of flexibility and
modularity, and avoids `MsgTypeError` handling policies from being
coupled to a particular backend IPC transport layer:
- receive-side msg-type errors, as can be raised and handled in the
  `.open_stream()` "nasty" phase of a ctx, whilst being packed at the
  `MsgCodec`/transport layer (keeping the underlying src decode error
  coupled to the specific transport + interchange lib) and then relayed
  upward to app code for custom handling like a normal Error` msg.
- the policy options for handling such cases could be implemented as
  `@acm` wrappers around `.open_context()`/`.open_stream()` blocks (and
  their respective delivered primitives) OR just plain old async
  generators around `MsgStream.receive()` such that both built-in policy
  handling and custom user-app solutions can be swapped without touching
  any `tractor` internals or providing specialized "registry APIs".
  -> eg. the ignore and relay-invalid-msg-to-sender approach can be more
   easily implemented as embedded `try: except MsgTypeError:` blocks
   around `MsgStream.receive()` possibly applied as either of an
   injected wrapper type around a stream or an async gen that `async
   for`s from the stream.
- any performance based AOT-lang extensions used to implement a policy
  for handling recv-side errors space can avoid knowledge of the lower
  level IPC `Channel` (and-downward) primitives.
- `Context` consuming code can choose to let all msg-type-errs
  bubble and handle them manually (like any other remote `Error`
  shuttled exception).
- we can keep (as before) send-side msg type checks can be raised
  locally and cause offending senders to error and adjust before the
  streaming phase of an IPC ctx.
Impl (related) deats:
- obvi make `MsgpackTCPStream.recv()` yield up any `MsgTypeError`
  constructed by `_mk_msg_type_err()` such that the exception will
  eventually be relayed up to `._rpc.process_messages()` and from
  their delivered to the corresponding ctx-task.
- in support of ^, make `Channel.recv()` detect said mtes and use the
  new `pack_from_raise()` to inject the far end `Actor.uid` for the
  `Error.src_uid`.
- keep raising the send side equivalent (when strict enabled) errors
  inline immediately with no upward `Error` packing or relay.
- improve `_mk_msg_type_err()` cases handling with far more detailed
  `MsgTypeError` "message" contents pertaining to `msgspec` specific
  failure-fixing-tips and type-spec mismatch info:
  * use `.from_decode()` constructor in recv-side case to inject the
    non-spec decoded `msg_dict: dict` and use the new
    `MsgCodec.pld_spec_str: str` when clarifying the type discrepancy
    with the offending field.
  * on send-side, if we detect that an unsupported field type was
    described in the original `src_type_error`, AND there is no
    `msgpack.Encoder.enc_hook()` set, that the real issue is likely
    that the user needs to extend the codec to support the
    non-std/custom type with a hook and link to `msgspec` docs.
  * if one of a `src_type/validation_error` is provided, set that
    error as the `.__cause__` in the new mte.
			
			
				runtime_to_msgspec
			
			
		
							parent
							
								
									ae42b91384
								
							
						
					
					
						commit
						62bb11975f
					
				
							
								
								
									
										163
									
								
								tractor/_ipc.py
								
								
								
								
							
							
						
						
									
										163
									
								
								tractor/_ipc.py
								
								
								
								
							|  | @ -38,7 +38,6 @@ from typing import ( | |||
|     Protocol, | ||||
|     Type, | ||||
|     TypeVar, | ||||
|     Union, | ||||
| ) | ||||
| 
 | ||||
| import msgspec | ||||
|  | @ -47,8 +46,9 @@ import trio | |||
| 
 | ||||
| from tractor.log import get_logger | ||||
| from tractor._exceptions import ( | ||||
|     TransportClosed, | ||||
|     MsgTypeError, | ||||
|     pack_from_raise, | ||||
|     TransportClosed, | ||||
| ) | ||||
| from tractor.msg import ( | ||||
|     _ctxvar_MsgCodec, | ||||
|  | @ -118,40 +118,75 @@ class MsgTransport(Protocol[MsgType]): | |||
|         ... | ||||
| 
 | ||||
| 
 | ||||
| def _raise_msg_type_err( | ||||
| def _mk_msg_type_err( | ||||
|     msg: Any|bytes, | ||||
|     codec: MsgCodec, | ||||
|     validation_err: msgspec.ValidationError|None = None, | ||||
| 
 | ||||
|     message: str|None = None, | ||||
|     verb_header: str = '', | ||||
| 
 | ||||
| ) -> None: | ||||
|     src_validation_error: msgspec.ValidationError|None = None, | ||||
|     src_type_error: TypeError|None = None, | ||||
| 
 | ||||
|     # if side == 'send': | ||||
|     if validation_err is None:  # send-side | ||||
| ) -> MsgTypeError: | ||||
| 
 | ||||
|         import traceback | ||||
|         from tractor._exceptions import pformat_boxed_tb | ||||
|     # `Channel.send()` case | ||||
|     if src_validation_error is None:  # send-side | ||||
| 
 | ||||
|         fmt_spec: str = '\n'.join( | ||||
|             map(str, codec.msg_spec.__args__) | ||||
|         ) | ||||
|         fmt_stack: str = ( | ||||
|             '\n'.join(traceback.format_stack(limit=3)) | ||||
|         ) | ||||
|         tb_fmt: str = pformat_boxed_tb( | ||||
|             tb_str=fmt_stack, | ||||
|             # fields_str=header, | ||||
|             field_prefix='  ', | ||||
|             indent='', | ||||
|         ) | ||||
|         raise MsgTypeError( | ||||
|             f'invalid msg -> {msg}: {type(msg)}\n\n' | ||||
|             f'{tb_fmt}\n' | ||||
|             f'Valid IPC msgs are:\n\n' | ||||
|             # f'  ------ - ------\n' | ||||
|             f'{fmt_spec}\n' | ||||
|         ) | ||||
|         # no src error from `msgspec.msgpack.Decoder.decode()` so | ||||
|         # prolly a manual type-check on our part. | ||||
|         if message is None: | ||||
|             import traceback | ||||
|             from tractor._exceptions import pformat_boxed_tb | ||||
| 
 | ||||
|             fmt_spec: str = '\n'.join( | ||||
|                 map(str, codec.msg_spec.__args__) | ||||
|             ) | ||||
|             fmt_stack: str = ( | ||||
|                 '\n'.join(traceback.format_stack(limit=3)) | ||||
|             ) | ||||
|             tb_fmt: str = pformat_boxed_tb( | ||||
|                 tb_str=fmt_stack, | ||||
|                 # fields_str=header, | ||||
|                 field_prefix='  ', | ||||
|                 indent='', | ||||
|             ) | ||||
|             message: str = ( | ||||
|                 f'invalid msg -> {msg}: {type(msg)}\n\n' | ||||
|                 f'{tb_fmt}\n' | ||||
|                 f'Valid IPC msgs are:\n\n' | ||||
|                 # f'  ------ - ------\n' | ||||
|                 f'{fmt_spec}\n', | ||||
|             ) | ||||
|         elif src_type_error: | ||||
|             src_message: str = str(src_type_error) | ||||
|             patt: str = 'type ' | ||||
|             type_idx: int = src_message.find('type ') | ||||
|             invalid_type: str = src_message[type_idx + len(patt):].split()[0] | ||||
| 
 | ||||
|             enc_hook: Callable|None = codec.enc.enc_hook | ||||
|             if enc_hook is None: | ||||
|                 message += ( | ||||
|                     '\n\n' | ||||
| 
 | ||||
|                     f"The current IPC-msg codec can't encode type `{invalid_type}` !\n" | ||||
|                     f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n' | ||||
| 
 | ||||
|                     f'Check the `msgspec` docs for ad-hoc type extending:\n' | ||||
|                     '|_ https://jcristharif.com/msgspec/extending.html\n' | ||||
|                     '|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n' | ||||
|                 ) | ||||
| 
 | ||||
| 
 | ||||
|         msgtyperr = MsgTypeError( | ||||
|             message=message, | ||||
|             ipc_msg=msg, | ||||
|         ) | ||||
|         # ya, might be `None` | ||||
|         msgtyperr.__cause__ = src_type_error | ||||
|         return msgtyperr | ||||
| 
 | ||||
|     # `Channel.recv()` case | ||||
|     else: | ||||
|         # decode the msg-bytes using the std msgpack | ||||
|         # interchange-prot (i.e. without any | ||||
|  | @ -161,29 +196,31 @@ def _raise_msg_type_err( | |||
|         msg_dict: dict = msgspec.msgpack.decode(msg) | ||||
|         msg_type_name: str = msg_dict['msg_type'] | ||||
|         msg_type = getattr(msgtypes, msg_type_name) | ||||
|         errmsg: str = ( | ||||
|         message: str = ( | ||||
|             f'invalid `{msg_type_name}` IPC msg\n\n' | ||||
|         ) | ||||
|         if verb_header: | ||||
|             errmsg = f'{verb_header} ' + errmsg | ||||
|             message = f'{verb_header} ' + message | ||||
| 
 | ||||
|         # XXX see if we can determine the exact invalid field | ||||
|         # such that we can comprehensively report the | ||||
|         # specific field's type problem | ||||
|         msgspec_msg: str = validation_err.args[0].rstrip('`') | ||||
|         msgspec_msg: str = src_validation_error.args[0].rstrip('`') | ||||
|         msg, _, maybe_field = msgspec_msg.rpartition('$.') | ||||
|         obj = object() | ||||
|         if (field_val := msg_dict.get(maybe_field, obj)) is not obj: | ||||
|             field_type: Union[Type] = msg_type.__signature__.parameters[ | ||||
|                 maybe_field | ||||
|             ].annotation | ||||
|             errmsg += ( | ||||
|             message += ( | ||||
|                 f'{msg.rstrip("`")}\n\n' | ||||
|                 f'{msg_type}\n' | ||||
|                 f' |_.{maybe_field}: {field_type} = {field_val!r}\n' | ||||
|                 f' |_.{maybe_field}: {codec.pld_spec_str} = {field_val!r}\n' | ||||
|             ) | ||||
| 
 | ||||
|         raise MsgTypeError(errmsg) from validation_err | ||||
|         msgtyperr = MsgTypeError.from_decode( | ||||
|             message=message, | ||||
|             msgdict=msg_dict, | ||||
|         ) | ||||
|         msgtyperr.__cause__ = src_validation_error | ||||
|         return msgtyperr | ||||
| 
 | ||||
| 
 | ||||
| # TODO: not sure why we have to inherit here, but it seems to be an | ||||
|  | @ -325,12 +362,15 @@ class MsgpackTCPStream(MsgTransport): | |||
|             # and always raise such that spec violations | ||||
|             # are never allowed to be caught silently! | ||||
|             except msgspec.ValidationError as verr: | ||||
|                 # re-raise as type error | ||||
|                 _raise_msg_type_err( | ||||
|                 msgtyperr: MsgTypeError = _mk_msg_type_err( | ||||
|                     msg=msg_bytes, | ||||
|                     codec=codec, | ||||
|                     validation_err=verr, | ||||
|                     src_validation_error=verr, | ||||
|                 ) | ||||
|                 # XXX deliver up to `Channel.recv()` where | ||||
|                 # a re-raise and `Error`-pack can inject the far | ||||
|                 # end actor `.uid`. | ||||
|                 yield msgtyperr | ||||
| 
 | ||||
|             except ( | ||||
|                 msgspec.DecodeError, | ||||
|  | @ -387,7 +427,7 @@ class MsgpackTCPStream(MsgTransport): | |||
| 
 | ||||
|             if type(msg) not in msgtypes.__msg_types__: | ||||
|                 if strict_types: | ||||
|                     _raise_msg_type_err( | ||||
|                     raise _mk_msg_type_err( | ||||
|                         msg, | ||||
|                         codec=codec, | ||||
|                     ) | ||||
|  | @ -400,11 +440,16 @@ class MsgpackTCPStream(MsgTransport): | |||
|             try: | ||||
|                 bytes_data: bytes = codec.encode(msg) | ||||
|             except TypeError as typerr: | ||||
|                 raise MsgTypeError( | ||||
|                     'A msg field violates the current spec\n' | ||||
|                     f'{codec.pld_spec}\n\n' | ||||
|                     f'{pretty_struct.Struct.pformat(msg)}' | ||||
|                 ) from typerr | ||||
|                 msgtyperr: MsgTypeError = _mk_msg_type_err( | ||||
|                     msg, | ||||
|                     codec=codec, | ||||
|                     message=( | ||||
|                         f'IPC-msg-spec violation in\n\n' | ||||
|                         f'{pretty_struct.Struct.pformat(msg)}' | ||||
|                     ), | ||||
|                     src_type_error=typerr, | ||||
|                 ) | ||||
|                 raise msgtyperr from typerr | ||||
| 
 | ||||
|             # supposedly the fastest says, | ||||
|             # https://stackoverflow.com/a/54027962 | ||||
|  | @ -719,13 +764,35 @@ class Channel: | |||
|         assert self._transport | ||||
|         while True: | ||||
|             try: | ||||
|                 async for item in self._transport: | ||||
|                     yield item | ||||
|                 async for msg in self._transport: | ||||
|                     match msg: | ||||
|                         # NOTE: if transport/interchange delivers | ||||
|                         # a type error, we pack it with the far | ||||
|                         # end peer `Actor.uid` and relay the | ||||
|                         # `Error`-msg upward to the `._rpc` stack | ||||
|                         # for normal RAE handling. | ||||
|                         case MsgTypeError(): | ||||
|                             yield pack_from_raise( | ||||
|                                 local_err=msg, | ||||
|                                 cid=msg.cid, | ||||
| 
 | ||||
|                                 # XXX we pack it here bc lower | ||||
|                                 # layers have no notion of an | ||||
|                                 # actor-id ;) | ||||
|                                 src_uid=self.uid, | ||||
|                             ) | ||||
|                         case _: | ||||
|                             yield msg | ||||
| 
 | ||||
|                     # TODO: if we were gonna do this it should be | ||||
|                     # done up at the `MsgStream` layer! | ||||
|                     # | ||||
|                     # sent = yield item | ||||
|                     # if sent is not None: | ||||
|                     #     # optimization, passing None through all the | ||||
|                     #     # time is pointless | ||||
|                     #     await self._transport.send(sent) | ||||
| 
 | ||||
|             except trio.BrokenResourceError: | ||||
| 
 | ||||
|                 # if not self._autorecon: | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue