Compare commits

..

4 Commits

Author SHA1 Message Date
Tyler Goodlet eec240a70a Tweak some `pformat_boxed_tb()` indent inputs
- add some `tb_str: str` indent-prefix args for diff indent levels for the
body vs. the surrounding "ascii box".
- ^-use it-^ from `RemoteActorError.__repr()__` obvi.
- use new `msg.types.from_dict_msg()` in impl of
  `MsgTypeError.payload_msg`, handy for showing what the message "would
  have looked like in `Struct` form" had it not failed it's type
  constraints.
2024-04-11 21:24:02 -04:00
Tyler Goodlet 322e015d32 Add custom `MsgCodec.__repr__()`
Sure makes console grokability a lot better by showing only the
customizeable fields.

Further, clean up `mk_codec()` a bunch by removing the `ipc_msg_spec`
param since we don't plan to support another msg-set (for now) which
allows cleaning out a buncha logic that was mostly just a source of
bugs..

Also,
- add temporary `log.info()` around codec application.
- throw in some sanity `assert`s to `limit_msg_spec()`.
- add but mask out the `extend_msg_spec()` idea since it seems `msgspec`
  won't allow `Decoder.type` extensions when using a custom `dec_hook()`
  for some extension type.. (not sure what approach to take here yet).
2024-04-11 21:04:48 -04:00
Tyler Goodlet dbc445ff9d Expose `tractor.msg.PayloadT` from subpkg 2024-04-11 20:42:54 -04:00
Tyler Goodlet 7aaa2a61ec Add msg-from-dict constructor helper
Handy for re-constructing a struct-`MsgType` from a `dict` decoded from
wire-bytes wherein the msg failed to decode normally due to a field type
error but you'd still like to show the "potential" msg in struct form,
say inside a `MsgTypeError`'s meta data.

Supporting deats:
- add a `.msg.types.from_dict_msg()` to implement it (the helper).
- also a `.msg.types._msg_table: dict[str, MsgType]` for supporting this
  func ^ as well as providing just a general `MsgType`-by-`str`-name
  lookup.

Unrelated:
- Drop commented idea for still supporting `dict`-msg set via
  `enc/dec_hook()`s that would translate to/from `MsgType`s, but that
  would require a duplicate impl in the runtime.. so eff that XD
2024-04-11 20:23:55 -04:00
5 changed files with 171 additions and 160 deletions

View File

@ -40,7 +40,7 @@ from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
from tractor.msg import ( from tractor.msg import (
Error, Error,
Msg, MsgType,
Stop, Stop,
Yield, Yield,
pretty_struct, pretty_struct,
@ -130,7 +130,10 @@ def pformat_boxed_tb(
tb_str: str, tb_str: str,
fields_str: str|None = None, fields_str: str|None = None,
field_prefix: str = ' |_', field_prefix: str = ' |_',
indent: str = ' '*2
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
) -> str: ) -> str:
if ( if (
fields_str fields_str
@ -139,15 +142,19 @@ def pformat_boxed_tb(
): ):
fields: str = textwrap.indent( fields: str = textwrap.indent(
fields_str, fields_str,
# prefix=' '*2,
# prefix=' |_',
prefix=field_prefix, prefix=field_prefix,
) )
else: else:
fields = fields_str or '' fields = fields_str or ''
# body_indent: str = len(field_prefix) * ' ' tb_body = tb_str
body: str = ( if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
)
tb_box: str = (
# orig # orig
# f' |\n' # f' |\n'
@ -158,21 +165,29 @@ def pformat_boxed_tb(
f'|\n' f'|\n'
f' ------ - ------\n\n' f' ------ - ------\n\n'
f'{tb_str}\n' # f'{tb_str}\n'
f'{tb_body}'
f' ------ - ------\n' f' ------ - ------\n'
f'_|\n' f'_|\n'
) )
if len(indent): tb_box_indent: str = (
body: str = textwrap.indent( tb_box_indent
body, or
# prefix=body_indent, 1
prefix=indent,
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
) )
return ( return (
fields fields
+ +
body tb_box
) )
@ -316,7 +331,7 @@ class RemoteActorError(Exception):
if self._ipc_msg is None: if self._ipc_msg is None:
return None return None
msg_type: Msg = type(self._ipc_msg) msg_type: MsgType = type(self._ipc_msg)
fields: dict[str, Any] = { fields: dict[str, Any] = {
k: v for _, k, v in k: v for _, k, v in
pretty_struct.iter_fields(self._ipc_msg) pretty_struct.iter_fields(self._ipc_msg)
@ -493,7 +508,10 @@ class RemoteActorError(Exception):
tb_str=self.tb_str, tb_str=self.tb_str,
fields_str=fields, fields_str=fields,
field_prefix=' |_', field_prefix=' |_',
indent=' ', # no indent? # ^- is so that it's placed like so,
# just after <Type(
# |___ ..
tb_body_indent=1,
) )
return ( return (
f'<{type(self).__name__}(\n' f'<{type(self).__name__}(\n'
@ -623,7 +641,7 @@ class MsgTypeError(
''' '''
reprol_fields: list[str] = [ reprol_fields: list[str] = [
'ipc_msg', 'payload_msg',
] ]
extra_body_fields: list[str] = [ extra_body_fields: list[str] = [
'cid', 'cid',
@ -633,7 +651,7 @@ class MsgTypeError(
@property @property
def msg_dict(self) -> dict[str, Any]: def msg_dict(self) -> dict[str, Any]:
''' '''
If the underlying IPC `Msg` was received from a remote If the underlying IPC `MsgType` was received from a remote
actor but was unable to be decoded to a native actor but was unable to be decoded to a native
`Yield`|`Started`|`Return` struct, the interchange backend `Yield`|`Started`|`Return` struct, the interchange backend
native format decoder can be used to stash a `dict` native format decoder can be used to stash a `dict`
@ -643,22 +661,21 @@ class MsgTypeError(
return self.msgdata.get('_msg_dict') return self.msgdata.get('_msg_dict')
@property @property
def payload_msg(self) -> Msg|None: def payload_msg(
self,
) -> MsgType|None:
''' '''
Attempt to construct what would have been the original Attempt to construct what would have been the original
`Msg`-with-payload subtype (i.e. an instance from the set `MsgType`-with-payload subtype (i.e. an instance from the set
of msgs in `.msg.types._payload_msgs`) which failed of msgs in `.msg.types._payload_msgs`) which failed
validation. validation.
''' '''
msg_dict: dict = self.msg_dict.copy() if msg_dict := self.msg_dict.copy():
name: str = msg_dict.pop('msg_type') return msgtypes.from_dict_msg(
msg_type: Msg = getattr( dict_msg=msg_dict,
msgtypes,
name,
Msg,
) )
return msg_type(**msg_dict) return None
@property @property
def cid(self) -> str: def cid(self) -> str:
@ -908,7 +925,7 @@ def is_multi_cancelled(exc: BaseException) -> bool:
def _raise_from_no_key_in_msg( def _raise_from_no_key_in_msg(
ctx: Context, ctx: Context,
msg: Msg, msg: MsgType,
src_err: KeyError, src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj log: StackLevelAdapter, # caller specific `log` obj

View File

@ -53,6 +53,9 @@ from .types import (
Error as Error, Error as Error,
# type-var for `.pld` field
PayloadT as PayloadT,
# full msg class set from above as list # full msg class set from above as list
__msg_types__ as __msg_types__, __msg_types__ as __msg_types__,

View File

@ -37,6 +37,7 @@ from contextlib import (
# ContextVar, # ContextVar,
# Token, # Token,
# ) # )
import textwrap
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -59,7 +60,9 @@ from tractor.msg.types import (
mk_msg_spec, mk_msg_spec,
MsgType, MsgType,
) )
from tractor.log import get_logger
log = get_logger(__name__)
# TODO: overall IPC msg-spec features (i.e. in this mod)! # TODO: overall IPC msg-spec features (i.e. in this mod)!
# #
@ -87,6 +90,27 @@ class MsgCodec(Struct):
pld_spec: Union[Type[Struct]]|None pld_spec: Union[Type[Struct]]|None
def __repr__(self) -> str:
speclines: str = textwrap.indent(
self.pformat_msg_spec(),
prefix=' '*3,
)
body: str = textwrap.indent(
f'|_lib = {self.lib.__name__!r}\n'
f'|_enc_hook: {self.enc.enc_hook}\n'
f'|_dec_hook: {self.dec.dec_hook}\n'
f'|_pld_spec: {self.pld_spec_str}\n'
# f'|\n'
f'|__msg_spec__:\n'
f'{speclines}\n',
prefix=' '*2,
)
return (
f'<{type(self).__name__}(\n'
f'{body}'
')>'
)
@property @property
def pld_spec_str(self) -> str: def pld_spec_str(self) -> str:
spec: Union[Type]|Type = self.pld_spec spec: Union[Type]|Type = self.pld_spec
@ -163,8 +187,8 @@ class MsgCodec(Struct):
) -> bytes: ) -> bytes:
''' '''
Encode input python objects to `msgpack` bytes for transfer Encode input python objects to `msgpack` bytes for
on a tranport protocol connection. transfer on a tranport protocol connection.
''' '''
return self._enc.encode(py_obj) return self._enc.encode(py_obj)
@ -325,15 +349,9 @@ class MsgCodec(Struct):
def mk_codec( def mk_codec(
ipc_msg_spec: Union[Type[Struct]]|Any|None = None,
#
# ^TODO^: in the long run, do we want to allow using a diff IPC `Msg`-set?
# it would break the runtime, but maybe say if you wanted
# to add some kinda field-specific or wholesale `.pld` ecryption?
# struct type unions set for `Decoder` # struct type unions set for `Decoder`
# https://jcristharif.com/msgspec/structs.html#tagged-unions # https://jcristharif.com/msgspec/structs.html#tagged-unions
ipc_pld_spec: Union[Type[Struct]]|Any|None = None, ipc_pld_spec: Union[Type[Struct]]|Any = Any,
# TODO: offering a per-msg(-field) type-spec such that # TODO: offering a per-msg(-field) type-spec such that
# the fields can be dynamically NOT decoded and left as `Raw` # the fields can be dynamically NOT decoded and left as `Raw`
@ -352,7 +370,6 @@ def mk_codec(
dec_hook: Callable|None = None, dec_hook: Callable|None = None,
enc_hook: Callable|None = None, enc_hook: Callable|None = None,
# ------ - ------ # ------ - ------
**kwargs,
# #
# Encoder: # Encoder:
# write_buffer_size=write_buffer_size, # write_buffer_size=write_buffer_size,
@ -367,28 +384,6 @@ def mk_codec(
`msgspec` ;). `msgspec` ;).
''' '''
if (
ipc_msg_spec is not None
and ipc_pld_spec
):
raise RuntimeError(
f'If a payload spec is provided,\n'
"the builtin SC-shuttle-protocol's msg set\n"
f'(i.e. a `{MsgType}`) MUST be used!\n\n'
f'However both values were passed as => mk_codec(\n'
f' ipc_msg_spec={ipc_msg_spec}`\n'
f' ipc_pld_spec={ipc_pld_spec}`\n)\n'
)
elif (
ipc_pld_spec
and
# XXX required for now (or maybe forever?) until
# we can dream up a way to allow parameterizing and/or
# custom overrides to the `Msg`-spec protocol itself?
ipc_msg_spec is None
):
# (manually) generate a msg-payload-spec for all relevant # (manually) generate a msg-payload-spec for all relevant
# god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT` # god-boxing-msg subtypes, parameterizing the `Msg.pld: PayloadT`
# for the decoder such that all sub-type msgs in our SCIPP # for the decoder such that all sub-type msgs in our SCIPP
@ -403,9 +398,6 @@ def mk_codec(
assert len(ipc_msg_spec.__args__) == len(msg_types) assert len(ipc_msg_spec.__args__) == len(msg_types)
assert ipc_msg_spec assert ipc_msg_spec
else:
ipc_msg_spec = ipc_msg_spec or Any
enc = msgpack.Encoder( enc = msgpack.Encoder(
enc_hook=enc_hook, enc_hook=enc_hook,
) )
@ -418,8 +410,6 @@ def mk_codec(
_enc=enc, _enc=enc,
_dec=dec, _dec=dec,
pld_spec=ipc_pld_spec, pld_spec=ipc_pld_spec,
# payload_msg_specs=payload_msg_specs,
# **kwargs,
) )
# sanity on expected backend support # sanity on expected backend support
@ -500,8 +490,16 @@ def apply_codec(
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
''' '''
__tracebackhide__: bool = True
orig: MsgCodec = _ctxvar_MsgCodec.get() orig: MsgCodec = _ctxvar_MsgCodec.get()
assert orig is not codec assert orig is not codec
if codec.pld_spec is None:
breakpoint()
log.info(
'Applying new msg-spec codec\n\n'
f'{codec}\n'
)
token: RunVarToken = _ctxvar_MsgCodec.set(codec) token: RunVarToken = _ctxvar_MsgCodec.set(codec)
# TODO: for TreeVar approach, see docs for @cm `.being()` API: # TODO: for TreeVar approach, see docs for @cm `.being()` API:
@ -518,7 +516,10 @@ def apply_codec(
_ctxvar_MsgCodec.reset(token) _ctxvar_MsgCodec.reset(token)
assert _ctxvar_MsgCodec.get() is orig assert _ctxvar_MsgCodec.get() is orig
log.info(
'Reverted to last msg-spec codec\n\n'
f'{orig}\n'
)
def current_codec() -> MsgCodec: def current_codec() -> MsgCodec:
''' '''
@ -532,14 +533,15 @@ def current_codec() -> MsgCodec:
@cm @cm
def limit_msg_spec( def limit_msg_spec(
payload_types: Union[Type[Struct]], payload_spec: Union[Type[Struct]],
# TODO: don't need this approach right? # TODO: don't need this approach right?
# -> related to the `MsgCodec._payload_decs` stuff above.. # -> related to the `MsgCodec._payload_decs` stuff above..
# tagged_structs: list[Struct]|None = None, # tagged_structs: list[Struct]|None = None,
**codec_kwargs, **codec_kwargs,
):
) -> MsgCodec:
''' '''
Apply a `MsgCodec` that will natively decode the SC-msg set's Apply a `MsgCodec` that will natively decode the SC-msg set's
`Msg.pld: Union[Type[Struct]]` payload fields using `Msg.pld: Union[Type[Struct]]` payload fields using
@ -547,10 +549,37 @@ def limit_msg_spec(
for all IPC contexts in use by the current `trio.Task`. for all IPC contexts in use by the current `trio.Task`.
''' '''
__tracebackhide__: bool = True
curr_codec = current_codec()
msgspec_codec: MsgCodec = mk_codec( msgspec_codec: MsgCodec = mk_codec(
payload_types=payload_types, ipc_pld_spec=payload_spec,
**codec_kwargs, **codec_kwargs,
) )
with apply_codec(msgspec_codec) as applied_codec: with apply_codec(msgspec_codec) as applied_codec:
assert applied_codec is msgspec_codec assert applied_codec is msgspec_codec
yield msgspec_codec yield msgspec_codec
assert curr_codec is current_codec()
# XXX: msgspec won't allow this with non-struct custom types
# like `NamespacePath`!@!
# @cm
# def extend_msg_spec(
# payload_spec: Union[Type[Struct]],
# ) -> MsgCodec:
# '''
# Extend the current `MsgCodec.pld_spec` (type set) by extending
# the payload spec to **include** the types specified by
# `payload_spec`.
# '''
# codec: MsgCodec = current_codec()
# pld_spec: Union[Type] = codec.pld_spec
# extended_spec: Union[Type] = pld_spec|payload_spec
# with limit_msg_spec(payload_types=extended_spec) as ext_codec:
# # import pdbp; pdbp.set_trace()
# assert ext_codec.pld_spec == extended_spec
# yield ext_codec

View File

@ -140,6 +140,7 @@ class Struct(
return sin_props return sin_props
# TODO: make thisi a mod-func!
def pformat( def pformat(
self, self,
field_indent: int = 2, field_indent: int = 2,

View File

@ -447,6 +447,29 @@ class Error(
_msg_dict: dict|None = None _msg_dict: dict|None = None
def from_dict_msg(
dict_msg: dict,
msgT: MsgType|None = None,
tag_field: str = 'msg_type'
) -> MsgType:
'''
Helper to build a specific `MsgType` struct from
a "vanilla" decoded `dict`-ified equivalent of the
msg: i.e. if the `msgpack.Decoder.type == Any`.
'''
msg_type_tag_field: str = (
msgT.__struct_config__.tag_field
if msgT is not None
else tag_field
)
# XXX ensure tag field is removed
msgT_name: str = dict_msg.pop(msg_type_tag_field)
msgT: MsgType = _msg_table[msgT_name]
return msgT(**dict_msg)
# TODO: should be make a msg version of `ContextCancelled?` # TODO: should be make a msg version of `ContextCancelled?`
# and/or with a scope field or a full `ActorCancelled`? # and/or with a scope field or a full `ActorCancelled`?
# class Cancelled(Msg): # class Cancelled(Msg):
@ -498,12 +521,18 @@ _payload_msgs: list[Msg] = [
# built-in SC shuttle protocol msg type set in # built-in SC shuttle protocol msg type set in
# approx order of the IPC txn-state spaces. # approx order of the IPC txn-state spaces.
__msg_types__: list[Msg] = ( __msg_types__: list[MsgType] = (
_runtime_msgs _runtime_msgs
+ +
_payload_msgs _payload_msgs
) )
_msg_table: dict[str, MsgType] = {
msgT.__name__: msgT
for msgT in __msg_types__
}
# TODO: use new type declaration syntax for msg-type-spec # TODO: use new type declaration syntax for msg-type-spec
# https://docs.python.org/3/library/typing.html#type-aliases # https://docs.python.org/3/library/typing.html#type-aliases
# https://docs.python.org/3/reference/simple_stmts.html#type # https://docs.python.org/3/reference/simple_stmts.html#type
@ -660,6 +689,11 @@ def mk_msg_spec(
'Generating new IPC msg-spec\n' 'Generating new IPC msg-spec\n'
f'{ipc_spec}\n' f'{ipc_spec}\n'
) )
assert (
ipc_spec
and
ipc_spec is not Any
)
return ( return (
ipc_spec, ipc_spec,
msgtypes_table[spec_build_method] + ipc_msg_types, msgtypes_table[spec_build_method] + ipc_msg_types,
@ -679,88 +713,15 @@ def mk_msg_spec(
# manual convertion from our above native `Msg` set # manual convertion from our above native `Msg` set
# to `dict` equivalent (wire msgs) in order to keep legacy compat # to `dict` equivalent (wire msgs) in order to keep legacy compat
# with the original runtime implementation. # with the original runtime implementation.
#
# Note: this is is/was primarly used while moving the core # Note: this is is/was primarly used while moving the core
# runtime over to using native `Msg`-struct types wherein we # runtime over to using native `Msg`-struct types wherein we
# start with the send side emitting without loading # start with the send side emitting without loading
# a typed-decoder and then later flipping the switch over to # a typed-decoder and then later flipping the switch over to
# load to the native struct types once all runtime usage has # load to the native struct types once all runtime usage has
# been adjusted appropriately. # been adjusted appropriately.
#
# ''' # '''
# def enc_to_dict(msg: Any) -> Any:
# '''
# Encode `Msg`-structs to `dict` msgs instead
# of using `msgspec.msgpack.Decoder.type`-ed
# features.
# '''
# match msg:
# case Start():
# dctmsg: dict = pretty_struct.Struct.to_dict(
# msg
# )['pld']
# case Error():
# dctmsg: dict = pretty_struct.Struct.to_dict(
# msg
# )['pld']
# return {'error': dctmsg}
# def dec_from_dict(
# type: Type,
# obj: Any,
# ) -> Any:
# '''
# Decode to `Msg`-structs from `dict` msgs instead
# of using `msgspec.msgpack.Decoder.type`-ed
# features.
# '''
# cid: str = obj.get('cid')
# match obj:
# case {'cmd': pld}:
# return Start(
# cid=cid,
# pld=pld,
# )
# case {'functype': pld}:
# return StartAck(
# cid=cid,
# functype=pld,
# # pld=IpcCtxSpec(
# # functype=pld,
# # ),
# )
# case {'started': pld}:
# return Started(
# cid=cid,
# pld=pld,
# )
# case {'yield': pld}:
# return Yield(
# cid=obj['cid'],
# pld=pld,
# )
# case {'stop': pld}:
# return Stop(
# cid=cid,
# )
# case {'return': pld}:
# return Return(
# cid=cid,
# pld=pld,
# )
# case {'error': pld}:
# return Error(
# cid=cid,
# pld=ErrorData(
# **pld
# ),
# )
# return ( # return (
# # enc_to_dict, # # enc_to_dict,
# dec_from_dict, # dec_from_dict,