From 49aeae4150dd97456a406333eeb995dea8fdf73f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 Mar 2025 14:13:36 -0500 Subject: [PATCH 01/19] Finally get type-extended `msgspec` fields workinn By using our new `PldRx` design we can, - pass through the pld-spec & a `dec_hook()` to our `MsgDec` which is used to configure the underlying `.dec: msgspec.msgpack.Decoder` - pass through a `enc_hook()` to `mk_codec()` and use it to conf the equiv `MsgCodec.enc` such that sent msg-plds are converted prior to transport. The trick ended up being just to always union the `mk_dec()` extension-types spec with the normaly with the `msgspec.Raw` pld-spec such that the `dec_hook()` is only invoked for payload types tagged by the encoder/sender side B) A variety of impl tweaks to make it all happen as well as various cleanups in the `.msg._codec` mod include, - `mk_dec()` no defaul `spec` arg, better doc string, accept the new `ext_types` arg, doing the union of that with `msgspec.Raw`. - proto-ed a now unused `mk_boxed_ext_struct()` which will likely get removed since it ended up that our `PayloadMsg` structs already cover the ext-type-hook requirement that the decoder is passed a `.type=msgspec.Struct` of some sort in order for `.dec_hook` to be used. - add a `unpack_spec_types()` util fn for getting the `set[Type]` from from a `Union[Type]` annotation instance. - mk the default `mk_codec(pc_pld_spec = Raw,)` since the `PldRx` design was already passing/overriding it and it doesn't make much sense to use `Any` anymore for the same reason; it will cause various `Context` apis to now break. |_ also accept a `enc_hook()` and `ext_types` which are used to maybe config the `.msgpack.Encoder` - generally tweak a bunch of comments-as-docs and todos namely the ones that are completed after the pld-rx design was implemented. Also, - mask the non-functioning `'defstruct'` approach `inside `.msg.types.mk_msg_spec()` to prep for its removal. Adjust the test suite (rn called `test_caps_based_msging`), - add a new suite `test_custom_extension_types` and move and use the `enc/dec_nsp()` hooks to the mod level for its use. - prolly planning to drop the `test_limit_msgspec` suite since it's mostly replaced by the `test_pldrx_limiting` mod's version? - originally was tweaking a bunch in `test_codec_hooks_mod` but likely it will get mostly rewritten to be simpler and simply verify that ext-typed fields can be used over IPC `Context`s between actors (as originally intended for this sub-suite). --- tests/test_caps_based_msging.py | 359 +++++++++++++++++++++++--------- tractor/msg/__init__.py | 1 + tractor/msg/_codec.py | 273 +++++++++++++++++++----- tractor/msg/_ops.py | 10 +- tractor/msg/types.py | 45 ++-- 5 files changed, 513 insertions(+), 175 deletions(-) diff --git a/tests/test_caps_based_msging.py b/tests/test_caps_based_msging.py index ba2bb101..3c2a73cb 100644 --- a/tests/test_caps_based_msging.py +++ b/tests/test_caps_based_msging.py @@ -15,13 +15,16 @@ from typing import ( from msgspec import ( structs, msgpack, + Raw, Struct, ValidationError, ) import pytest +import trio import tractor from tractor import ( + Actor, _state, MsgTypeError, Context, @@ -32,7 +35,9 @@ from tractor.msg import ( NamespacePath, MsgCodec, + MsgDec, mk_codec, + mk_dec, apply_codec, current_codec, ) @@ -43,101 +48,34 @@ from tractor.msg.types import ( Started, mk_msg_spec, ) -import trio +from tractor.msg._ops import ( + limit_plds, +) def mk_custom_codec( - pld_spec: Union[Type]|Any, add_hooks: bool, -) -> MsgCodec: +) -> tuple[ + MsgCodec, # encode to send + MsgDec, # pld receive-n-decode +]: ''' Create custom `msgpack` enc/dec-hooks and set a `Decoder` which only loads `pld_spec` (like `NamespacePath`) types. ''' - uid: tuple[str, str] = tractor.current_actor().uid # XXX NOTE XXX: despite defining `NamespacePath` as a type # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair # to cast to/from that type on the wire. See the docs: # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types - def enc_nsp(obj: Any) -> Any: - print(f'{uid} ENC HOOK') - match obj: - case NamespacePath(): - print( - f'{uid}: `NamespacePath`-Only ENCODE?\n' - f'obj-> `{obj}`: {type(obj)}\n' - ) - # if type(obj) != NamespacePath: - # breakpoint() - return str(obj) - - print( - f'{uid}\n' - 'CUSTOM ENCODE\n' - f'obj-arg-> `{obj}`: {type(obj)}\n' - ) - logmsg: str = ( - f'{uid}\n' - 'FAILED ENCODE\n' - f'obj-> `{obj}: {type(obj)}`\n' - ) - raise NotImplementedError(logmsg) - - def dec_nsp( - obj_type: Type, - obj: Any, - - ) -> Any: - print( - f'{uid}\n' - 'CUSTOM DECODE\n' - f'type-arg-> {obj_type}\n' - f'obj-arg-> `{obj}`: {type(obj)}\n' - ) - nsp = None - - if ( - obj_type is NamespacePath - and isinstance(obj, str) - and ':' in obj - ): - nsp = NamespacePath(obj) - # TODO: we could built a generic handler using - # JUST matching the obj_type part? - # nsp = obj_type(obj) - - if nsp: - print(f'Returning NSP instance: {nsp}') - return nsp - - logmsg: str = ( - f'{uid}\n' - 'FAILED DECODE\n' - f'type-> {obj_type}\n' - f'obj-arg-> `{obj}`: {type(obj)}\n\n' - f'current codec:\n' - f'{current_codec()}\n' - ) - # TODO: figure out the ignore subsys for this! - # -[ ] option whether to defense-relay backc the msg - # inside an `Invalid`/`Ignore` - # -[ ] how to make this handling pluggable such that a - # `Channel`/`MsgTransport` can intercept and process - # back msgs either via exception handling or some other - # signal? - log.warning(logmsg) - # NOTE: this delivers the invalid - # value up to `msgspec`'s decoding - # machinery for error raising. - return obj - # raise NotImplementedError(logmsg) + # if pld_spec is Any: + # pld_spec = Raw nsp_codec: MsgCodec = mk_codec( - ipc_pld_spec=pld_spec, + # ipc_pld_spec=Raw, # default! # NOTE XXX: the encode hook MUST be used no matter what since # our `NamespacePath` is not any of a `Any` native type nor @@ -153,8 +91,9 @@ def mk_custom_codec( # XXX NOTE: pretty sure this is mutex with the `type=` to # `Decoder`? so it won't work in tandem with the # `ipc_pld_spec` passed above? - dec_hook=dec_nsp if add_hooks else None, + ext_types=[NamespacePath], ) + # dec_hook=dec_nsp if add_hooks else None, return nsp_codec @@ -365,7 +304,7 @@ async def send_back_values( expect_debug: bool, pld_spec_type_strs: list[str], add_hooks: bool, - started_msg_bytes: bytes, + # started_msg_bytes: bytes, expect_ipc_send: dict[str, tuple[Any, bool]], ) -> None: @@ -392,24 +331,36 @@ async def send_back_values( # same as on parent side config. nsp_codec: MsgCodec = mk_custom_codec( - pld_spec=ipc_pld_spec, add_hooks=add_hooks, ) with ( apply_codec(nsp_codec) as codec, + limit_plds(ipc_pld_spec) as codec, ): + # we SHOULD NOT be swapping the global codec since it breaks + # `Context.starte()` roundtripping checks! chk_codec_applied( expect_codec=nsp_codec, - enter_value=codec, ) + # XXX SO NOT THIS! + # chk_codec_applied( + # expect_codec=nsp_codec, + # enter_value=codec, + # ) print( f'{uid}: attempting `Started`-bytes DECODE..\n' ) try: - msg: Started = nsp_codec.decode(started_msg_bytes) - expected_pld_spec_str: str = msg.pld - assert pld_spec_str == expected_pld_spec_str + # msg: Started = nsp_codec.decode(started_msg_bytes) + + ipc_spec: Type = ctx._pld_rx._pld_dec.spec + expected_pld_spec_str: str = str(ipc_spec) + assert ( + pld_spec_str == expected_pld_spec_str + and + ipc_pld_spec == ipc_spec + ) # TODO: maybe we should add our own wrapper error so as to # be interchange-lib agnostic? @@ -427,12 +378,15 @@ async def send_back_values( else: print( f'{uid}: (correctly) unable to DECODE `Started`-bytes\n' - f'{started_msg_bytes}\n' + # f'{started_msg_bytes}\n' ) iter_send_val_items = iter(expect_ipc_send.values()) sent: list[Any] = [] - for send_value, expect_send in iter_send_val_items: + for ( + send_value, + expect_send, + ) in iter_send_val_items: try: print( f'{uid}: attempting to `.started({send_value})`\n' @@ -457,12 +411,13 @@ async def send_back_values( break # move on to streaming block.. - except tractor.MsgTypeError: - await tractor.pause() + except tractor.MsgTypeError as _mte: + mte = _mte + # await tractor.pause() if expect_send: raise RuntimeError( - f'EXPECTED to `.started()` value given spec:\n' + f'EXPECTED to `.started()` value given spec ??\n\n' f'ipc_pld_spec -> {ipc_pld_spec}\n' f'value -> {send_value}: {type(send_value)}\n' ) @@ -530,10 +485,6 @@ async def send_back_values( # ) -def ex_func(*args): - print(f'ex_func({args})') - - @pytest.mark.parametrize( 'ipc_pld_spec', [ @@ -593,7 +544,6 @@ def test_codec_hooks_mod( # - codec modified with hooks -> decode nsp as # `NamespacePath` nsp_codec: MsgCodec = mk_custom_codec( - pld_spec=ipc_pld_spec, add_hooks=add_codec_hooks, ) with apply_codec(nsp_codec) as codec: @@ -609,7 +559,11 @@ def test_codec_hooks_mod( f'ipc_pld_spec: {ipc_pld_spec}\n' ' ------ - ------\n' ) - for val_type_str, val, expect_send in iter_maybe_sends( + for ( + val_type_str, + val, + expect_send, + )in iter_maybe_sends( send_items, ipc_pld_spec, add_codec_hooks=add_codec_hooks, @@ -618,7 +572,10 @@ def test_codec_hooks_mod( f'send_value: {val}: {type(val)} ' f'=> expect_send: {expect_send}\n' ) - expect_ipc_send[val_type_str] = (val, expect_send) + expect_ipc_send[val_type_str] = ( + val, + expect_send, + ) print( report + @@ -627,9 +584,24 @@ def test_codec_hooks_mod( assert len(expect_ipc_send) == len(send_items) # now try over real IPC with a the subactor # expect_ipc_rountrip: bool = True + + if ( + subtypes := getattr( + ipc_pld_spec, '__args__', False + ) + ): + pld_types_str: str = '|'.join(subtypes) + breakpoint() + else: + pld_types_str: str = ipc_pld_spec.__name__ + expected_started = Started( cid='cid', - pld=str(ipc_pld_spec), + # pld=str(pld_types_str), + pld=ipc_pld_spec, + ) + started_msg_bytes: bytes = nsp_codec.encode( + expected_started, ) # build list of values we expect to receive from # the subactor. @@ -655,7 +627,7 @@ def test_codec_hooks_mod( expect_debug=debug_mode, pld_spec_type_strs=pld_spec_type_strs, add_hooks=add_codec_hooks, - started_msg_bytes=nsp_codec.encode(expected_started), + started_msg_bytes=started_msg_bytes, # XXX NOTE bc we send a `NamespacePath` in this kwarg expect_ipc_send=expect_ipc_send, @@ -673,6 +645,8 @@ def test_codec_hooks_mod( # test with `limit_msg_spec()` above? # await tractor.pause() print('PARENT opening IPC ctx!\n') + ctx: tractor.Context + ipc: tractor.MsgStream async with ( # XXX should raise an mte (`MsgTypeError`) @@ -877,6 +851,10 @@ def chk_pld_type( def test_limit_msgspec( debug_mode: bool, ): + ''' + Verify that type-limiting the + + ''' async def main(): async with tractor.open_root_actor( debug_mode=debug_mode, @@ -915,3 +893,188 @@ def test_limit_msgspec( # breakpoint() trio.run(main) + + +def enc_nsp(obj: Any) -> Any: + actor: Actor = tractor.current_actor( + err_on_no_runtime=False, + ) + uid: tuple[str, str]|None = None if not actor else actor.uid + print(f'{uid} ENC HOOK') + + match obj: + # case NamespacePath()|str(): + case NamespacePath(): + encoded: str = str(obj) + print( + f'----- ENCODING `NamespacePath` as `str` ------\n' + f'|_obj:{type(obj)!r} = {obj!r}\n' + f'|_encoded: str = {encoded!r}\n' + ) + # if type(obj) != NamespacePath: + # breakpoint() + return encoded + case _: + logmsg: str = ( + f'{uid}\n' + 'FAILED ENCODE\n' + f'obj-> `{obj}: {type(obj)}`\n' + ) + raise NotImplementedError(logmsg) + + +def dec_nsp( + obj_type: Type, + obj: Any, + +) -> Any: + # breakpoint() + actor: Actor = tractor.current_actor( + err_on_no_runtime=False, + ) + uid: tuple[str, str]|None = None if not actor else actor.uid + print( + f'{uid}\n' + 'CUSTOM DECODE\n' + f'type-arg-> {obj_type}\n' + f'obj-arg-> `{obj}`: {type(obj)}\n' + ) + nsp = None + # XXX, never happens right? + if obj_type is Raw: + breakpoint() + + if ( + obj_type is NamespacePath + and isinstance(obj, str) + and ':' in obj + ): + nsp = NamespacePath(obj) + # TODO: we could built a generic handler using + # JUST matching the obj_type part? + # nsp = obj_type(obj) + + if nsp: + print(f'Returning NSP instance: {nsp}') + return nsp + + logmsg: str = ( + f'{uid}\n' + 'FAILED DECODE\n' + f'type-> {obj_type}\n' + f'obj-arg-> `{obj}`: {type(obj)}\n\n' + f'current codec:\n' + f'{current_codec()}\n' + ) + # TODO: figure out the ignore subsys for this! + # -[ ] option whether to defense-relay backc the msg + # inside an `Invalid`/`Ignore` + # -[ ] how to make this handling pluggable such that a + # `Channel`/`MsgTransport` can intercept and process + # back msgs either via exception handling or some other + # signal? + log.warning(logmsg) + # NOTE: this delivers the invalid + # value up to `msgspec`'s decoding + # machinery for error raising. + return obj + # raise NotImplementedError(logmsg) + + +def ex_func(*args): + ''' + A mod level func we can ref and load via our `NamespacePath` + python-object pointer `str` subtype. + + ''' + print(f'ex_func({args})') + + +@pytest.mark.parametrize( + 'add_codec_hooks', + [ + True, + False, + ], + ids=['use_codec_hooks', 'no_codec_hooks'], +) +def test_custom_extension_types( + debug_mode: bool, + add_codec_hooks: bool +): + ''' + Verify that a `MsgCodec` (used for encoding all outbound IPC msgs + and decoding all inbound `PayloadMsg`s) and a paired `MsgDec` + (used for decoding the `PayloadMsg.pld: Raw` received within a given + task's ipc `Context` scope) can both send and receive "extension types" + as supported via custom converter hooks passed to `msgspec`. + + ''' + nsp_pld_dec: MsgDec = mk_dec( + spec=None, # ONLY support the ext type + dec_hook=dec_nsp if add_codec_hooks else None, + ext_types=[NamespacePath], + ) + nsp_codec: MsgCodec = mk_codec( + # ipc_pld_spec=Raw, # default! + + # NOTE XXX: the encode hook MUST be used no matter what since + # our `NamespacePath` is not any of a `Any` native type nor + # a `msgspec.Struct` subtype - so `msgspec` has no way to know + # how to encode it unless we provide the custom hook. + # + # AGAIN that is, regardless of whether we spec an + # `Any`-decoded-pld the enc has no knowledge (by default) + # how to enc `NamespacePath` (nsp), so we add a custom + # hook to do that ALWAYS. + enc_hook=enc_nsp if add_codec_hooks else None, + + # XXX NOTE: pretty sure this is mutex with the `type=` to + # `Decoder`? so it won't work in tandem with the + # `ipc_pld_spec` passed above? + ext_types=[NamespacePath], + + # TODO? is it useful to have the `.pld` decoded *prior* to + # the `PldRx`?? like perf or mem related? + # ext_dec=nsp_pld_dec, + ) + if add_codec_hooks: + assert nsp_codec.dec.dec_hook is None + + # TODO? if we pass `ext_dec` above? + # assert nsp_codec.dec.dec_hook is dec_nsp + + assert nsp_codec.enc.enc_hook is enc_nsp + + nsp = NamespacePath.from_ref(ex_func) + + try: + nsp_bytes: bytes = nsp_codec.encode(nsp) + nsp_rt_sin_msg = nsp_pld_dec.decode(nsp_bytes) + nsp_rt_sin_msg.load_ref() is ex_func + except TypeError: + if not add_codec_hooks: + pass + + try: + msg_bytes: bytes = nsp_codec.encode( + Started( + cid='cid', + pld=nsp, + ) + ) + # since the ext-type obj should also be set as the msg.pld + assert nsp_bytes in msg_bytes + started_rt: Started = nsp_codec.decode(msg_bytes) + pld: Raw = started_rt.pld + assert isinstance(pld, Raw) + nsp_rt: NamespacePath = nsp_pld_dec.decode(pld) + assert isinstance(nsp_rt, NamespacePath) + # in obj comparison terms they should be the same + assert nsp_rt == nsp + # ensure we've decoded to ext type! + assert nsp_rt.load_ref() is ex_func + + except TypeError: + if not add_codec_hooks: + pass diff --git a/tractor/msg/__init__.py b/tractor/msg/__init__.py index 44586f2d..88220054 100644 --- a/tractor/msg/__init__.py +++ b/tractor/msg/__init__.py @@ -33,6 +33,7 @@ from ._codec import ( apply_codec as apply_codec, mk_codec as mk_codec, + mk_dec as mk_dec, MsgCodec as MsgCodec, MsgDec as MsgDec, current_codec as current_codec, diff --git a/tractor/msg/_codec.py b/tractor/msg/_codec.py index 32f690f1..46716d4c 100644 --- a/tractor/msg/_codec.py +++ b/tractor/msg/_codec.py @@ -61,6 +61,7 @@ from tractor.msg.pretty_struct import Struct from tractor.msg.types import ( mk_msg_spec, MsgType, + PayloadMsg, ) from tractor.log import get_logger @@ -80,6 +81,7 @@ class MsgDec(Struct): ''' _dec: msgpack.Decoder + # _ext_types_box: Struct|None = None @property def dec(self) -> msgpack.Decoder: @@ -179,23 +181,122 @@ class MsgDec(Struct): def mk_dec( - spec: Union[Type[Struct]]|Any = Any, + spec: Union[Type[Struct]]|Type|None, + + # NOTE, required for ad-hoc type extensions to the underlying + # serialization proto (which is default `msgpack`), + # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types dec_hook: Callable|None = None, + ext_types: list[Type]|None = None, ) -> MsgDec: ''' - Create an IPC msg decoder, normally used as the - `PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`. + Create an IPC msg decoder, a slightly higher level wrapper around + a `msgspec.msgpack.Decoder` which provides, + + - easier introspection of the underlying type spec via + the `.spec` and `.spec_str` attrs, + - `.hook` access to the `Decoder.dec_hook()`, + - automatic custom extension-types decode support when + `dec_hook()` is provided such that any `PayloadMsg.pld` tagged + as a type from from `ext_types` (presuming the `MsgCodec.encode()` also used + a `.enc_hook()`) is processed and constructed by a `PldRx` implicitily. + + NOTE, as mentioned a `MsgDec` is normally used for `PayloadMsg.pld: PayloadT` field + decoding inside an IPC-ctx-oriented `PldRx`. ''' + if ( + spec is None + and + ext_types is None + ): + raise ValueError( + f'You must provide a type-spec for a msg decoder!\n' + f'The only time `spec=None` is permitted is if custom extension types ' + f'are expected to be supported, in which case `ext_types` must be non-`None`' + f'and it is presumed that only the `ext_types` (supported by the paired `dec_hook()`) ' + f'will be permitted within the type-`spec`!\n' + + f'tpec = {spec!r}\n' + f'dec_hook = {dec_hook!r}\n' + f'ext_types = {ext_types!r}\n' + ) + + if dec_hook: + if ext_types is None: + raise ValueError( + f'If extending the serializable types with a custom decoder hook, ' + f'you must also provide the expected type set `dec_hook()` will handle ' + f'via the `ext_types: Union[Type]|None = None` argument!\n' + f'dec_hook = {dec_hook!r}\n' + f'ext_types = {ext_types!r}\n' + ) + + # XXX, i *thought* we would require a boxing struct as per docs, + # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types + # |_ see comment, + # > Note that typed deserialization is required for + # > successful roundtripping here, so we pass `MyMessage` to + # > `Decoder`. + # + # BUT, turns out as long as you spec a union with `Raw` it + # will work? kk B) + # + # maybe_box_struct = mk_boxed_ext_struct(ext_types) + spec = Raw | Union[*ext_types] + return MsgDec( _dec=msgpack.Decoder( type=spec, # like `MsgType[Any]` dec_hook=dec_hook, - ) + ), ) +# TODO? remove since didn't end up needing this? +def mk_boxed_ext_struct( + ext_types: list[Type], +) -> Struct: + # NOTE, originally was to wrap non-msgpack-supported "extension + # types" in a field-typed boxing struct, see notes around the + # `dec_hook()` branch in `mk_dec()`. + ext_types_union = Union[*ext_types] + repr_ext_types_union: str = ( + str(ext_types_union) + or + "|".join(ext_types) + ) + BoxedExtType = msgspec.defstruct( + f'BoxedExts[{repr_ext_types_union}]', + fields=[ + ('boxed', ext_types_union), + ], + ) + return BoxedExtType + + +def unpack_spec_types( + spec: Union[Type]|Type, +) -> set[Type]: + ''' + Given an input type-`spec`, either a lone type + or a `Union` of types (like `str|int|MyThing`), + return a set of individual types. + + When `spec` is not a type-union returns `{spec,}`. + + ''' + spec_subtypes: set[Union[Type]] = ( + getattr( + spec, + '__args__', + {spec,}, + ) + ) + return spec_subtypes + + def mk_msgspec_table( dec: msgpack.Decoder, msg: MsgType|None = None, @@ -273,6 +374,8 @@ class MsgCodec(Struct): _dec: msgpack.Decoder _pld_spec: Type[Struct]|Raw|Any + # _ext_types_box: Struct|None = None + def __repr__(self) -> str: speclines: str = textwrap.indent( pformat_msgspec(codec=self), @@ -339,12 +442,14 @@ class MsgCodec(Struct): def encode( self, - py_obj: Any, + py_obj: Any|PayloadMsg, use_buf: bool = False, # ^-XXX-^ uhh why am i getting this? # |_BufferError: Existing exports of data: object cannot be re-sized + as_ext_type: bool = False, + ) -> bytes: ''' Encode input python objects to `msgpack` bytes for @@ -357,8 +462,33 @@ class MsgCodec(Struct): if use_buf: self._enc.encode_into(py_obj, self._buf) return self._buf - else: - return self._enc.encode(py_obj) + + return self._enc.encode(py_obj) + # TODO! REMOVE once i'm confident we won't ever need it! + # + # box: Struct = self._ext_types_box + # if ( + # as_ext_type + # or + # ( + # # XXX NOTE, auto-detect if the input type + # box + # and + # (ext_types := unpack_spec_types( + # spec=box.__annotations__['boxed']) + # ) + # ) + # ): + # match py_obj: + # # case PayloadMsg(pld=pld) if ( + # # type(pld) in ext_types + # # ): + # # py_obj.pld = box(boxed=py_obj) + # # breakpoint() + # case _ if ( + # type(py_obj) in ext_types + # ): + # py_obj = box(boxed=py_obj) @property def dec(self) -> msgpack.Decoder: @@ -378,21 +508,30 @@ class MsgCodec(Struct): return self._dec.decode(msg) -# [x] TODO: a sub-decoder system as well? => No! +# ?TODO? time to remove this finally? +# +# -[x] TODO: a sub-decoder system as well? +# => No! already re-architected to include a "payload-receiver" +# now found in `._ops`. # # -[x] do we still want to try and support the sub-decoder with # `.Raw` technique in the case that the `Generic` approach gives # future grief? -# => NO, since we went with the `PldRx` approach instead B) +# => well YES but NO, since we went with the `PldRx` approach +# instead! # # IF however you want to see the code that was staged for this # from wayyy back, see the pure removal commit. def mk_codec( - # struct type unions set for `Decoder` - # https://jcristharif.com/msgspec/structs.html#tagged-unions - ipc_pld_spec: Union[Type[Struct]]|Any = Any, + ipc_pld_spec: Union[Type[Struct]]|Any|Raw = Raw, + # tagged-struct-types-union set for `Decoder`ing of payloads, as + # per https://jcristharif.com/msgspec/structs.html#tagged-unions. + # NOTE that the default `Raw` here **is very intentional** since + # the `PldRx._pld_dec: MsgDec` is responsible for per ipc-ctx-task + # decoding of msg-specs defined by the user as part of **their** + # `tractor` "app's" type-limited IPC msg-spec. # TODO: offering a per-msg(-field) type-spec such that # the fields can be dynamically NOT decoded and left as `Raw` @@ -405,13 +544,18 @@ def mk_codec( libname: str = 'msgspec', - # proxy as `Struct(**kwargs)` for ad-hoc type extensions + # settings for encoding-to-send extension-types, # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types - # ------ - ------ - dec_hook: Callable|None = None, + # dec_hook: Callable|None = None, enc_hook: Callable|None = None, - # ------ - ------ + ext_types: list[Type]|None = None, + + # optionally provided msg-decoder from which we pull its, + # |_.dec_hook() + # |_.type + ext_dec: MsgDec|None = None # + # ?TODO? other params we might want to support # Encoder: # write_buffer_size=write_buffer_size, # @@ -425,26 +569,43 @@ def mk_codec( `msgspec` ;). ''' - # (manually) generate a msg-payload-spec for all relevant - # god-boxing-msg subtypes, parameterizing the `PayloadMsg.pld: PayloadT` - # for the decoder such that all sub-type msgs in our SCIPP - # will automatically decode to a type-"limited" payload (`Struct`) - # object (set). + pld_spec = ipc_pld_spec + if enc_hook: + if not ext_types: + raise ValueError( + f'If extending the serializable types with a custom decoder hook, ' + f'you must also provide the expected type set `enc_hook()` will handle ' + f'via the `ext_types: Union[Type]|None = None` argument!\n' + f'enc_hook = {enc_hook!r}\n' + f'ext_types = {ext_types!r}\n' + ) + + dec_hook: Callable|None = None + if ext_dec: + dec: msgspec.Decoder = ext_dec.dec + dec_hook = dec.dec_hook + pld_spec |= dec.type + if ext_types: + pld_spec |= Union[*ext_types] + + # (manually) generate a msg-spec (how appropes) for all relevant + # payload-boxing-struct-msg-types, parameterizing the + # `PayloadMsg.pld: PayloadT` for the decoder such that all msgs + # in our SC-RPC-protocol will automatically decode to + # a type-"limited" payload (`Struct`) object (set). ( ipc_msg_spec, msg_types, ) = mk_msg_spec( - payload_type_union=ipc_pld_spec, + payload_type_union=pld_spec, ) - assert len(ipc_msg_spec.__args__) == len(msg_types) - assert ipc_msg_spec - # TODO: use this shim instead? - # bc.. unification, err somethin? - # dec: MsgDec = mk_dec( - # spec=ipc_msg_spec, - # dec_hook=dec_hook, - # ) + msg_spec_types: set[Type] = unpack_spec_types(ipc_msg_spec) + assert ( + len(ipc_msg_spec.__args__) == len(msg_types) + and + len(msg_spec_types) == len(msg_types) + ) dec = msgpack.Decoder( type=ipc_msg_spec, @@ -453,22 +614,29 @@ def mk_codec( enc = msgpack.Encoder( enc_hook=enc_hook, ) - codec = MsgCodec( _enc=enc, _dec=dec, - _pld_spec=ipc_pld_spec, + _pld_spec=pld_spec, ) - # sanity on expected backend support assert codec.lib.__name__ == libname - return codec # instance of the default `msgspec.msgpack` codec settings, i.e. # no custom structs, hooks or other special types. -_def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any) +# +# XXX NOTE XXX, this will break our `Context.start()` call! +# +# * by default we roundtrip the started pld-`value` and if you apply +# this codec (globally anyway with `apply_codec()`) then the +# `roundtripped` value will include a non-`.pld: Raw` which will +# then type-error on the consequent `._ops.validte_payload_msg()`.. +# +_def_msgspec_codec: MsgCodec = mk_codec( + ipc_pld_spec=Any, +) # The built-in IPC `Msg` spec. # Our composing "shuttle" protocol which allows `tractor`-app code @@ -476,13 +644,13 @@ _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any) # https://jcristharif.com/msgspec/supported-types.html # _def_tractor_codec: MsgCodec = mk_codec( - # TODO: use this for debug mode locking prot? - # ipc_pld_spec=Any, - ipc_pld_spec=Raw, + ipc_pld_spec=Raw, # XXX should be default righ!? ) -# TODO: IDEALLY provides for per-`trio.Task` specificity of the + +# -[x] TODO, IDEALLY provides for per-`trio.Task` specificity of the # IPC msging codec used by the transport layer when doing # `Channel.send()/.recv()` of wire data. +# => impled as our `PldRx` which is `Context` scoped B) # ContextVar-TODO: DIDN'T WORK, kept resetting in every new task to default!? # _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( @@ -559,17 +727,6 @@ def apply_codec( ) token: Token = var.set(codec) - # ?TODO? for TreeVar approach which copies from the - # cancel-scope of the prior value, NOT the prior task - # See the docs: - # - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables - # - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py - # ^- see docs for @cm `.being()` API - # with _ctxvar_MsgCodec.being(codec): - # new = _ctxvar_MsgCodec.get() - # assert new is codec - # yield codec - try: yield var.get() finally: @@ -580,6 +737,19 @@ def apply_codec( ) assert var.get() is orig + # ?TODO? for TreeVar approach which copies from the + # cancel-scope of the prior value, NOT the prior task + # + # See the docs: + # - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables + # - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py + # ^- see docs for @cm `.being()` API + # + # with _ctxvar_MsgCodec.being(codec): + # new = _ctxvar_MsgCodec.get() + # assert new is codec + # yield codec + def current_codec() -> MsgCodec: ''' @@ -599,6 +769,7 @@ def limit_msg_spec( # -> related to the `MsgCodec._payload_decs` stuff above.. # tagged_structs: list[Struct]|None = None, + hide_tb: bool = True, **codec_kwargs, ) -> MsgCodec: @@ -609,7 +780,7 @@ def limit_msg_spec( for all IPC contexts in use by the current `trio.Task`. ''' - __tracebackhide__: bool = True + __tracebackhide__: bool = hide_tb curr_codec: MsgCodec = current_codec() msgspec_codec: MsgCodec = mk_codec( ipc_pld_spec=payload_spec, diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index dc632217..6f178ba5 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -50,7 +50,9 @@ from tractor._exceptions import ( _mk_recv_mte, pack_error, ) -from tractor._state import current_ipc_ctx +from tractor._state import ( + current_ipc_ctx, +) from ._codec import ( mk_dec, MsgDec, @@ -78,7 +80,7 @@ if TYPE_CHECKING: log = get_logger(__name__) -_def_any_pldec: MsgDec[Any] = mk_dec() +_def_any_pldec: MsgDec[Any] = mk_dec(spec=Any) class PldRx(Struct): @@ -148,6 +150,10 @@ class PldRx(Struct): exit. ''' + # TODO, ensure we pull the current `MsgCodec`'s custom + # dec/enc_hook settings as well ? + # -[ ] see `._codec.mk_codec()` inputs + # orig_dec: MsgDec = self._pld_dec limit_dec: MsgDec = mk_dec( spec=spec, diff --git a/tractor/msg/types.py b/tractor/msg/types.py index 0904411f..1cc8b78e 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -599,15 +599,15 @@ def mk_msg_spec( Msg[payload_type_union], Generic[PayloadT], ) - defstruct_bases: tuple = ( - Msg, # [payload_type_union], - # Generic[PayloadT], - # ^-XXX-^: not allowed? lul.. - ) + # defstruct_bases: tuple = ( + # Msg, # [payload_type_union], + # # Generic[PayloadT], + # # ^-XXX-^: not allowed? lul.. + # ) ipc_msg_types: list[Msg] = [] idx_msg_types: list[Msg] = [] - defs_msg_types: list[Msg] = [] + # defs_msg_types: list[Msg] = [] nc_msg_types: list[Msg] = [] for msgtype in __msg_types__: @@ -625,7 +625,7 @@ def mk_msg_spec( # TODO: wait why do we need the dynamic version here? # XXX ANSWER XXX -> BC INHERITANCE.. don't work w generics.. # - # NOTE previously bc msgtypes WERE NOT inheritting + # NOTE previously bc msgtypes WERE NOT inheriting # directly the `Generic[PayloadT]` type, the manual method # of generic-paraming with `.__class_getitem__()` wasn't # working.. @@ -662,38 +662,35 @@ def mk_msg_spec( # with `msgspec.structs.defstruct` # XXX ALSO DOESN'T WORK - defstruct_msgtype = defstruct( - name=msgtype.__name__, - fields=[ - ('cid', str), + # defstruct_msgtype = defstruct( + # name=msgtype.__name__, + # fields=[ + # ('cid', str), - # XXX doesn't seem to work.. - # ('pld', PayloadT), - - ('pld', payload_type_union), - ], - bases=defstruct_bases, - ) - defs_msg_types.append(defstruct_msgtype) + # # XXX doesn't seem to work.. + # # ('pld', PayloadT), + # ('pld', payload_type_union), + # ], + # bases=defstruct_bases, + # ) + # defs_msg_types.append(defstruct_msgtype) # assert index_paramed_msg_type == manual_paramed_msg_subtype - # paramed_msg_type = manual_paramed_msg_subtype - # ipc_payload_msgs_type_union |= index_paramed_msg_type idx_spec: Union[Type[Msg]] = Union[*idx_msg_types] - def_spec: Union[Type[Msg]] = Union[*defs_msg_types] + # def_spec: Union[Type[Msg]] = Union[*defs_msg_types] nc_spec: Union[Type[Msg]] = Union[*nc_msg_types] specs: dict[str, Union[Type[Msg]]] = { 'indexed_generics': idx_spec, - 'defstruct': def_spec, + # 'defstruct': def_spec, 'types_new_class': nc_spec, } msgtypes_table: dict[str, list[Msg]] = { 'indexed_generics': idx_msg_types, - 'defstruct': defs_msg_types, + # 'defstruct': defs_msg_types, 'types_new_class': nc_msg_types, } -- 2.34.1 From 56d0f10195a4b5576a1be251f5f7209143c3143c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 Mar 2025 14:38:22 -0500 Subject: [PATCH 02/19] Move `Union` serializers to new `msg.` mod Namely moving `enc/dec_type_union()` from the test mod to a new `tractor.msg._exts` for general use outside the test suite. --- tests/test_caps_based_msging.py | 70 +++++-------------------- tractor/msg/_exts.py | 90 +++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 57 deletions(-) create mode 100644 tractor/msg/_exts.py diff --git a/tests/test_caps_based_msging.py b/tests/test_caps_based_msging.py index 3c2a73cb..cdc6d59d 100644 --- a/tests/test_caps_based_msging.py +++ b/tests/test_caps_based_msging.py @@ -5,7 +5,6 @@ Low-level functional audits for our B~) ''' -import typing from typing import ( Any, Type, @@ -32,6 +31,7 @@ from tractor import ( from tractor.msg import ( _codec, _ctxvar_MsgCodec, + _exts, NamespacePath, MsgCodec, @@ -247,57 +247,6 @@ def iter_maybe_sends( ) -def dec_type_union( - type_names: list[str], -) -> Type: - ''' - Look up types by name, compile into a list and then create and - return a `typing.Union` from the full set. - - ''' - import importlib - types: list[Type] = [] - for type_name in type_names: - for mod in [ - typing, - importlib.import_module(__name__), - ]: - if type_ref := getattr( - mod, - type_name, - False, - ): - types.append(type_ref) - - # special case handling only.. - # ipc_pld_spec: Union[Type] = eval( - # pld_spec_str, - # {}, # globals - # {'typing': typing}, # locals - # ) - - return Union[*types] - - -def enc_type_union( - union_or_type: Union[Type]|Type, -) -> list[str]: - ''' - Encode a type-union or single type to a list of type-name-strings - ready for IPC interchange. - - ''' - type_strs: list[str] = [] - for typ in getattr( - union_or_type, - '__args__', - {union_or_type,}, - ): - type_strs.append(typ.__qualname__) - - return type_strs - - @tractor.context async def send_back_values( ctx: Context, @@ -324,7 +273,7 @@ async def send_back_values( ) # load pld spec from input str - ipc_pld_spec = dec_type_union( + ipc_pld_spec = _exts.dec_type_union( pld_spec_type_strs, ) pld_spec_str = str(ipc_pld_spec) @@ -413,7 +362,6 @@ async def send_back_values( except tractor.MsgTypeError as _mte: mte = _mte - # await tractor.pause() if expect_send: raise RuntimeError( @@ -422,6 +370,10 @@ async def send_back_values( f'value -> {send_value}: {type(send_value)}\n' ) + # await tractor.pause() + raise mte + + async with ctx.open_stream() as ipc: print( f'{uid}: Entering streaming block to send remaining values..' @@ -591,8 +543,9 @@ def test_codec_hooks_mod( ) ): pld_types_str: str = '|'.join(subtypes) - breakpoint() + # breakpoint() else: + # TODO, use `.msg._exts` utils instead of this! pld_types_str: str = ipc_pld_spec.__name__ expected_started = Started( @@ -611,7 +564,7 @@ def test_codec_hooks_mod( if expect_send ] - pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec) + pld_spec_type_strs: list[str] = _exts.enc_type_union(ipc_pld_spec) # XXX should raise an mte (`MsgTypeError`) # when `add_codec_hooks == False` bc the input @@ -848,11 +801,14 @@ def chk_pld_type( return roundtrip +# ?TODO? remove since covered in the newer `test_pldrx_limiting`? def test_limit_msgspec( debug_mode: bool, ): ''' - Verify that type-limiting the + Internals unit testing to verify that type-limiting an IPC ctx's + msg spec with `Pldrx.limit_plds()` results in various + encapsulated `msgspec` object settings and state. ''' async def main(): diff --git a/tractor/msg/_exts.py b/tractor/msg/_exts.py new file mode 100644 index 00000000..abf7bcde --- /dev/null +++ b/tractor/msg/_exts.py @@ -0,0 +1,90 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Type-extension-utils for codec-ing (python) objects not +covered by the `msgspec.msgpack` protocol. + +See the various API docs from `msgspec`. + +extending from native types, +- https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types + +converters, +- https://jcristharif.com/msgspec/converters.html +- https://jcristharif.com/msgspec/api.html#msgspec.convert + +`Raw` fields, +- https://jcristharif.com/msgspec/api.html#raw +- support for `.convert()` and `Raw`, + |_ https://jcristharif.com/msgspec/changelog.html + +''' +import typing +from typing import ( + Type, + Union, +) + +def dec_type_union( + type_names: list[str], +) -> Type: + ''' + Look up types by name, compile into a list and then create and + return a `typing.Union` from the full set. + + ''' + import importlib + types: list[Type] = [] + for type_name in type_names: + for mod in [ + typing, + importlib.import_module(__name__), + ]: + if type_ref := getattr( + mod, + type_name, + False, + ): + types.append(type_ref) + + # special case handling only.. + # ipc_pld_spec: Union[Type] = eval( + # pld_spec_str, + # {}, # globals + # {'typing': typing}, # locals + # ) + + return Union[*types] + + +def enc_type_union( + union_or_type: Union[Type]|Type, +) -> list[str]: + ''' + Encode a type-union or single type to a list of type-name-strings + ready for IPC interchange. + + ''' + type_strs: list[str] = [] + for typ in getattr( + union_or_type, + '__args__', + {union_or_type,}, + ): + type_strs.append(typ.__qualname__) + + return type_strs -- 2.34.1 From f8cc03463019384b1abebf99042f49c1ccbda577 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Mar 2025 15:48:18 -0500 Subject: [PATCH 03/19] Tweak type-error messages for when `ext_types` is missing --- tractor/msg/_codec.py | 50 ++++++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/tractor/msg/_codec.py b/tractor/msg/_codec.py index 46716d4c..1e9623af 100644 --- a/tractor/msg/_codec.py +++ b/tractor/msg/_codec.py @@ -211,24 +211,28 @@ def mk_dec( and ext_types is None ): - raise ValueError( - f'You must provide a type-spec for a msg decoder!\n' - f'The only time `spec=None` is permitted is if custom extension types ' - f'are expected to be supported, in which case `ext_types` must be non-`None`' - f'and it is presumed that only the `ext_types` (supported by the paired `dec_hook()`) ' - f'will be permitted within the type-`spec`!\n' - - f'tpec = {spec!r}\n' + raise TypeError( + f'MIssing type-`spec` for msg decoder!\n' + f'\n' + f'`spec=None` is **only** permitted is if custom extension types ' + f'are provided via `ext_types`, meaning it must be non-`None`.\n' + f'\n' + f'In this case it is presumed that only the `ext_types`, ' + f'which much be handled by a paired `dec_hook()`, ' + f'will be permitted within the payload type-`spec`!\n' + f'\n' + f'spec = {spec!r}\n' f'dec_hook = {dec_hook!r}\n' f'ext_types = {ext_types!r}\n' ) if dec_hook: if ext_types is None: - raise ValueError( - f'If extending the serializable types with a custom decoder hook, ' - f'you must also provide the expected type set `dec_hook()` will handle ' - f'via the `ext_types: Union[Type]|None = None` argument!\n' + raise TypeError( + f'If extending the serializable types with a custom decode hook (`dec_hook()`), ' + f'you must also provide the expected type set that the hook will handle ' + f'via a `ext_types: Union[Type]|None = None` argument!\n' + f'\n' f'dec_hook = {dec_hook!r}\n' f'ext_types = {ext_types!r}\n' ) @@ -287,7 +291,7 @@ def unpack_spec_types( When `spec` is not a type-union returns `{spec,}`. ''' - spec_subtypes: set[Union[Type]] = ( + spec_subtypes: set[Union[Type]] = set( getattr( spec, '__args__', @@ -449,6 +453,7 @@ class MsgCodec(Struct): # |_BufferError: Existing exports of data: object cannot be re-sized as_ext_type: bool = False, + hide_tb: bool = True, ) -> bytes: ''' @@ -459,11 +464,21 @@ class MsgCodec(Struct): https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffer ''' + __tracebackhide__: bool = hide_tb if use_buf: self._enc.encode_into(py_obj, self._buf) return self._buf return self._enc.encode(py_obj) + # try: + # return self._enc.encode(py_obj) + # except TypeError as typerr: + # typerr.add_note( + # '|_src error from `msgspec`' + # # f'|_{self._enc.encode!r}' + # ) + # raise typerr + # TODO! REMOVE once i'm confident we won't ever need it! # # box: Struct = self._ext_types_box @@ -572,10 +587,11 @@ def mk_codec( pld_spec = ipc_pld_spec if enc_hook: if not ext_types: - raise ValueError( - f'If extending the serializable types with a custom decoder hook, ' - f'you must also provide the expected type set `enc_hook()` will handle ' - f'via the `ext_types: Union[Type]|None = None` argument!\n' + raise TypeError( + f'If extending the serializable types with a custom encode hook (`enc_hook()`), ' + f'you must also provide the expected type set that the hook will handle ' + f'via a `ext_types: Union[Type]|None = None` argument!\n' + f'\n' f'enc_hook = {enc_hook!r}\n' f'ext_types = {ext_types!r}\n' ) -- 2.34.1 From fbb8c7ecd0cd38d6eea69e3edc767cd5a50816ad Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Mar 2025 15:49:21 -0500 Subject: [PATCH 04/19] Offer a `mods: list` to `dec_type_union()`; drop importing this-mod --- tractor/msg/_exts.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tractor/msg/_exts.py b/tractor/msg/_exts.py index abf7bcde..31eafb5d 100644 --- a/tractor/msg/_exts.py +++ b/tractor/msg/_exts.py @@ -33,6 +33,9 @@ converters, |_ https://jcristharif.com/msgspec/changelog.html ''' +from types import ( + ModuleType, +) import typing from typing import ( Type, @@ -41,19 +44,20 @@ from typing import ( def dec_type_union( type_names: list[str], -) -> Type: + mods: list[ModuleType] = [] +) -> Type|Union[Type]: ''' Look up types by name, compile into a list and then create and return a `typing.Union` from the full set. ''' - import importlib + # import importlib types: list[Type] = [] for type_name in type_names: for mod in [ typing, - importlib.import_module(__name__), - ]: + # importlib.import_module(__name__), + ] + mods: if type_ref := getattr( mod, type_name, -- 2.34.1 From 337385762ba1de5628ab67422186562c6fb6a6fa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Mar 2025 15:50:14 -0500 Subject: [PATCH 05/19] Raise RTE from `limit_plds()` on no `curr_ctx` Since it should only be used from within a `Portal.open_context()` scope, make sure the caller knows that! Also don't hide the frame in tb if the immediate function errors.. --- tractor/msg/_ops.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 6f178ba5..839be532 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -461,11 +461,16 @@ def limit_plds( ''' __tracebackhide__: bool = True + curr_ctx: Context|None = current_ipc_ctx() + if curr_ctx is None: + raise RuntimeError( + 'No IPC `Context` is active !?\n' + 'Did you open `limit_plds()` from outside ' + 'a `Portal.open_context()` scope-block?' + ) try: - curr_ctx: Context = current_ipc_ctx() rx: PldRx = curr_ctx._pld_rx orig_pldec: MsgDec = rx.pld_dec - with rx.limit_plds( spec=spec, **dec_kwargs, @@ -475,6 +480,11 @@ def limit_plds( f'{pldec}\n' ) yield pldec + + except BaseException: + __tracebackhide__: bool = False + raise + finally: log.runtime( 'Reverted to previous payload-decoder\n\n' -- 2.34.1 From 048a232a6eb07b8b6780b0b498cb7acfff06d109 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Mar 2025 15:52:13 -0500 Subject: [PATCH 06/19] Rework IPC-using `test_caps_basesd_msging` tests Namely renaming and massively simplifying it to a new `test_ext_types_over_ipc` which avoids all the wacky "parent dictates what sender should be able to send beforehand".. Instead keep it simple and just always try to send the same small set of types over the wire with expect-logic to handle each case, - use the new `dec_hook`/`ext_types` args to `mk_[co]dec()` routines for pld-spec ipc transport. - always try to stream a small set of types from the child with logic to handle the cases expected to error. Other, - draft a `test_pld_limiting_usage` to check runtime raising of bad API usage; haven't run it yet tho. - move `test_custom_extension_types` to top of mod so that the `enc/dec_nsp()` hooks can be reffed from test parametrizations. - comment out (and maybe remove) the old routines for `iter_maybe_sends`, `test_limit_msgspec`, `chk_pld_type`. XXX TODO, turns out the 2 failing cases from this suite have exposed an an actual bug with `MsgTypeError` unpacking where the `ipc_msg=` input is being set to `None` ?? -> see the comment at the bottom of `._exceptions._mk_recv_mte()` which seems to describe the likely culprit? --- tests/test_caps_based_msging.py | 1631 ++++++++++++++++--------------- 1 file changed, 825 insertions(+), 806 deletions(-) diff --git a/tests/test_caps_based_msging.py b/tests/test_caps_based_msging.py index cdc6d59d..4d78a117 100644 --- a/tests/test_caps_based_msging.py +++ b/tests/test_caps_based_msging.py @@ -5,6 +5,11 @@ Low-level functional audits for our B~) ''' +from contextlib import ( + contextmanager as cm, + # nullcontext, +) +import importlib from typing import ( Any, Type, @@ -12,10 +17,10 @@ from typing import ( ) from msgspec import ( - structs, - msgpack, + # structs, + # msgpack, Raw, - Struct, + # Struct, ValidationError, ) import pytest @@ -24,7 +29,7 @@ import trio import tractor from tractor import ( Actor, - _state, + # _state, MsgTypeError, Context, ) @@ -42,815 +47,16 @@ from tractor.msg import ( current_codec, ) from tractor.msg.types import ( - _payload_msgs, log, - PayloadMsg, Started, - mk_msg_spec, + # _payload_msgs, + # PayloadMsg, + # mk_msg_spec, ) from tractor.msg._ops import ( limit_plds, ) - -def mk_custom_codec( - add_hooks: bool, - -) -> tuple[ - MsgCodec, # encode to send - MsgDec, # pld receive-n-decode -]: - ''' - Create custom `msgpack` enc/dec-hooks and set a `Decoder` - which only loads `pld_spec` (like `NamespacePath`) types. - - ''' - - # XXX NOTE XXX: despite defining `NamespacePath` as a type - # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair - # to cast to/from that type on the wire. See the docs: - # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types - - # if pld_spec is Any: - # pld_spec = Raw - - nsp_codec: MsgCodec = mk_codec( - # ipc_pld_spec=Raw, # default! - - # NOTE XXX: the encode hook MUST be used no matter what since - # our `NamespacePath` is not any of a `Any` native type nor - # a `msgspec.Struct` subtype - so `msgspec` has no way to know - # how to encode it unless we provide the custom hook. - # - # AGAIN that is, regardless of whether we spec an - # `Any`-decoded-pld the enc has no knowledge (by default) - # how to enc `NamespacePath` (nsp), so we add a custom - # hook to do that ALWAYS. - enc_hook=enc_nsp if add_hooks else None, - - # XXX NOTE: pretty sure this is mutex with the `type=` to - # `Decoder`? so it won't work in tandem with the - # `ipc_pld_spec` passed above? - ext_types=[NamespacePath], - ) - # dec_hook=dec_nsp if add_hooks else None, - return nsp_codec - - -def chk_codec_applied( - expect_codec: MsgCodec, - enter_value: MsgCodec|None = None, - -) -> MsgCodec: - ''' - buncha sanity checks ensuring that the IPC channel's - context-vars are set to the expected codec and that are - ctx-var wrapper APIs match the same. - - ''' - # TODO: play with tricyle again, bc this is supposed to work - # the way we want? - # - # TreeVar - # task: trio.Task = trio.lowlevel.current_task() - # curr_codec = _ctxvar_MsgCodec.get_in(task) - - # ContextVar - # task_ctx: Context = task.context - # assert _ctxvar_MsgCodec in task_ctx - # curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec] - - # NOTE: currently we use this! - # RunVar - curr_codec: MsgCodec = current_codec() - last_read_codec = _ctxvar_MsgCodec.get() - # assert curr_codec is last_read_codec - - assert ( - (same_codec := expect_codec) is - # returned from `mk_codec()` - - # yielded value from `apply_codec()` - - # read from current task's `contextvars.Context` - curr_codec is - last_read_codec - - # the default `msgspec` settings - is not _codec._def_msgspec_codec - is not _codec._def_tractor_codec - ) - - if enter_value: - enter_value is same_codec - - -def iter_maybe_sends( - send_items: dict[Union[Type], Any] | list[tuple], - ipc_pld_spec: Union[Type] | Any, - add_codec_hooks: bool, - - codec: MsgCodec|None = None, - -) -> tuple[Any, bool]: - - if isinstance(send_items, dict): - send_items = send_items.items() - - for ( - send_type_spec, - send_value, - ) in send_items: - - expect_roundtrip: bool = False - - # values-to-typespec santiy - send_type = type(send_value) - assert send_type == send_type_spec or ( - (subtypes := getattr(send_type_spec, '__args__', None)) - and send_type in subtypes - ) - - spec_subtypes: set[Union[Type]] = ( - getattr( - ipc_pld_spec, - '__args__', - {ipc_pld_spec,}, - ) - ) - send_in_spec: bool = ( - send_type == ipc_pld_spec - or ( - ipc_pld_spec != Any - and # presume `Union` of types - send_type in spec_subtypes - ) - or ( - ipc_pld_spec == Any - and - send_type != NamespacePath - ) - ) - expect_roundtrip = ( - send_in_spec - # any spec should support all other - # builtin py values that we send - # except our custom nsp type which - # we should be able to send as long - # as we provide the custom codec hooks. - or ( - ipc_pld_spec == Any - and - send_type == NamespacePath - and - add_codec_hooks - ) - ) - - if codec is not None: - # XXX FIRST XXX ensure roundtripping works - # before touching any IPC primitives/APIs. - wire_bytes: bytes = codec.encode( - Started( - cid='blahblah', - pld=send_value, - ) - ) - # NOTE: demonstrates the decoder loading - # to via our native SCIPP msg-spec - # (structurred-conc-inter-proc-protocol) - # implemented as per, - try: - msg: Started = codec.decode(wire_bytes) - if not expect_roundtrip: - pytest.fail( - f'NOT-EXPECTED able to roundtrip value given spec:\n' - f'ipc_pld_spec -> {ipc_pld_spec}\n' - f'value -> {send_value}: {send_type}\n' - ) - - pld = msg.pld - assert pld == send_value - - except ValidationError: - if expect_roundtrip: - pytest.fail( - f'EXPECTED to roundtrip value given spec:\n' - f'ipc_pld_spec -> {ipc_pld_spec}\n' - f'value -> {send_value}: {send_type}\n' - ) - - yield ( - str(send_type), - send_value, - expect_roundtrip, - ) - - -@tractor.context -async def send_back_values( - ctx: Context, - expect_debug: bool, - pld_spec_type_strs: list[str], - add_hooks: bool, - # started_msg_bytes: bytes, - expect_ipc_send: dict[str, tuple[Any, bool]], - -) -> None: - ''' - Setup up a custom codec to load instances of `NamespacePath` - and ensure we can round trip a func ref with our parent. - - ''' - uid: tuple = tractor.current_actor().uid - - # debug mode sanity check (prolly superfluous but, meh) - assert expect_debug == _state.debug_mode() - - # init state in sub-actor should be default - chk_codec_applied( - expect_codec=_codec._def_tractor_codec, - ) - - # load pld spec from input str - ipc_pld_spec = _exts.dec_type_union( - pld_spec_type_strs, - ) - pld_spec_str = str(ipc_pld_spec) - - # same as on parent side config. - nsp_codec: MsgCodec = mk_custom_codec( - add_hooks=add_hooks, - ) - with ( - apply_codec(nsp_codec) as codec, - limit_plds(ipc_pld_spec) as codec, - ): - # we SHOULD NOT be swapping the global codec since it breaks - # `Context.starte()` roundtripping checks! - chk_codec_applied( - expect_codec=nsp_codec, - ) - # XXX SO NOT THIS! - # chk_codec_applied( - # expect_codec=nsp_codec, - # enter_value=codec, - # ) - - print( - f'{uid}: attempting `Started`-bytes DECODE..\n' - ) - try: - # msg: Started = nsp_codec.decode(started_msg_bytes) - - ipc_spec: Type = ctx._pld_rx._pld_dec.spec - expected_pld_spec_str: str = str(ipc_spec) - assert ( - pld_spec_str == expected_pld_spec_str - and - ipc_pld_spec == ipc_spec - ) - - # TODO: maybe we should add our own wrapper error so as to - # be interchange-lib agnostic? - # -[ ] the error type is wtv is raised from the hook so we - # could also require a type-class of errors for - # indicating whether the hook-failure can be handled by - # a nasty-dialog-unprot sub-sys? - except ValidationError: - - # NOTE: only in the `Any` spec case do we expect this to - # work since otherwise no spec covers a plain-ol' - # `.pld: str` - if pld_spec_str == 'Any': - raise - else: - print( - f'{uid}: (correctly) unable to DECODE `Started`-bytes\n' - # f'{started_msg_bytes}\n' - ) - - iter_send_val_items = iter(expect_ipc_send.values()) - sent: list[Any] = [] - for ( - send_value, - expect_send, - ) in iter_send_val_items: - try: - print( - f'{uid}: attempting to `.started({send_value})`\n' - f'=> expect_send: {expect_send}\n' - f'SINCE, ipc_pld_spec: {ipc_pld_spec}\n' - f'AND, codec: {codec}\n' - ) - await ctx.started(send_value) - sent.append(send_value) - if not expect_send: - - # XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL - # `str` handling! or special debug mode IPC - # msgs! - await tractor.pause() - - raise RuntimeError( - f'NOT-EXPECTED able to roundtrip value given spec:\n' - f'ipc_pld_spec -> {ipc_pld_spec}\n' - f'value -> {send_value}: {type(send_value)}\n' - ) - - break # move on to streaming block.. - - except tractor.MsgTypeError as _mte: - mte = _mte - - if expect_send: - raise RuntimeError( - f'EXPECTED to `.started()` value given spec ??\n\n' - f'ipc_pld_spec -> {ipc_pld_spec}\n' - f'value -> {send_value}: {type(send_value)}\n' - ) - - # await tractor.pause() - raise mte - - - async with ctx.open_stream() as ipc: - print( - f'{uid}: Entering streaming block to send remaining values..' - ) - - for send_value, expect_send in iter_send_val_items: - send_type: Type = type(send_value) - print( - '------ - ------\n' - f'{uid}: SENDING NEXT VALUE\n' - f'ipc_pld_spec: {ipc_pld_spec}\n' - f'expect_send: {expect_send}\n' - f'val: {send_value}\n' - '------ - ------\n' - ) - try: - await ipc.send(send_value) - print(f'***\n{uid}-CHILD sent {send_value!r}\n***\n') - sent.append(send_value) - - # NOTE: should only raise above on - # `.started()` or a `Return` - # if not expect_send: - # raise RuntimeError( - # f'NOT-EXPECTED able to roundtrip value given spec:\n' - # f'ipc_pld_spec -> {ipc_pld_spec}\n' - # f'value -> {send_value}: {send_type}\n' - # ) - - except ValidationError: - print(f'{uid} FAILED TO SEND {send_value}!') - - # await tractor.pause() - if expect_send: - raise RuntimeError( - f'EXPECTED to roundtrip value given spec:\n' - f'ipc_pld_spec -> {ipc_pld_spec}\n' - f'value -> {send_value}: {send_type}\n' - ) - # continue - - else: - print( - f'{uid}: finished sending all values\n' - 'Should be exiting stream block!\n' - ) - - print(f'{uid}: exited streaming block!') - - # TODO: this won't be true bc in streaming phase we DO NOT - # msgspec check outbound msgs! - # -[ ] once we implement the receiver side `InvalidMsg` - # then we can expect it here? - # assert ( - # len(sent) - # == - # len([val - # for val, expect in - # expect_ipc_send.values() - # if expect is True]) - # ) - - -@pytest.mark.parametrize( - 'ipc_pld_spec', - [ - Any, - NamespacePath, - NamespacePath|None, # the "maybe" spec Bo - ], - ids=[ - 'any_type', - 'nsp_type', - 'maybe_nsp_type', - ] -) -@pytest.mark.parametrize( - 'add_codec_hooks', - [ - True, - False, - ], - ids=['use_codec_hooks', 'no_codec_hooks'], -) -def test_codec_hooks_mod( - debug_mode: bool, - ipc_pld_spec: Union[Type]|Any, - # send_value: None|str|NamespacePath, - add_codec_hooks: bool, -): - ''' - Audit the `.msg.MsgCodec` override apis details given our impl - uses `contextvars` to accomplish per `trio` task codec - application around an inter-proc-task-comms context. - - ''' - async def main(): - nsp = NamespacePath.from_ref(ex_func) - send_items: dict[Union, Any] = { - Union[None]: None, - Union[NamespacePath]: nsp, - Union[str]: str(nsp), - } - - # init default state for actor - chk_codec_applied( - expect_codec=_codec._def_tractor_codec, - ) - - async with tractor.open_nursery( - debug_mode=debug_mode, - ) as an: - p: tractor.Portal = await an.start_actor( - 'sub', - enable_modules=[__name__], - ) - - # TODO: 2 cases: - # - codec not modified -> decode nsp as `str` - # - codec modified with hooks -> decode nsp as - # `NamespacePath` - nsp_codec: MsgCodec = mk_custom_codec( - add_hooks=add_codec_hooks, - ) - with apply_codec(nsp_codec) as codec: - chk_codec_applied( - expect_codec=nsp_codec, - enter_value=codec, - ) - - expect_ipc_send: dict[str, tuple[Any, bool]] = {} - - report: str = ( - 'Parent report on send values with\n' - f'ipc_pld_spec: {ipc_pld_spec}\n' - ' ------ - ------\n' - ) - for ( - val_type_str, - val, - expect_send, - )in iter_maybe_sends( - send_items, - ipc_pld_spec, - add_codec_hooks=add_codec_hooks, - ): - report += ( - f'send_value: {val}: {type(val)} ' - f'=> expect_send: {expect_send}\n' - ) - expect_ipc_send[val_type_str] = ( - val, - expect_send, - ) - - print( - report + - ' ------ - ------\n' - ) - assert len(expect_ipc_send) == len(send_items) - # now try over real IPC with a the subactor - # expect_ipc_rountrip: bool = True - - if ( - subtypes := getattr( - ipc_pld_spec, '__args__', False - ) - ): - pld_types_str: str = '|'.join(subtypes) - # breakpoint() - else: - # TODO, use `.msg._exts` utils instead of this! - pld_types_str: str = ipc_pld_spec.__name__ - - expected_started = Started( - cid='cid', - # pld=str(pld_types_str), - pld=ipc_pld_spec, - ) - started_msg_bytes: bytes = nsp_codec.encode( - expected_started, - ) - # build list of values we expect to receive from - # the subactor. - expect_to_send: list[Any] = [ - val - for val, expect_send in expect_ipc_send.values() - if expect_send - ] - - pld_spec_type_strs: list[str] = _exts.enc_type_union(ipc_pld_spec) - - # XXX should raise an mte (`MsgTypeError`) - # when `add_codec_hooks == False` bc the input - # `expect_ipc_send` kwarg has a nsp which can't be - # serialized! - # - # TODO:can we ensure this happens from the - # `Return`-side (aka the sub) as well? - if not add_codec_hooks: - try: - async with p.open_context( - send_back_values, - expect_debug=debug_mode, - pld_spec_type_strs=pld_spec_type_strs, - add_hooks=add_codec_hooks, - started_msg_bytes=started_msg_bytes, - - # XXX NOTE bc we send a `NamespacePath` in this kwarg - expect_ipc_send=expect_ipc_send, - - ) as (ctx, first): - pytest.fail('ctx should fail to open without custom enc_hook!?') - - # this test passes bc we can go no further! - except MsgTypeError: - # teardown nursery - await p.cancel_actor() - return - - # TODO: send the original nsp here and - # test with `limit_msg_spec()` above? - # await tractor.pause() - print('PARENT opening IPC ctx!\n') - ctx: tractor.Context - ipc: tractor.MsgStream - async with ( - - # XXX should raise an mte (`MsgTypeError`) - # when `add_codec_hooks == False`.. - p.open_context( - send_back_values, - expect_debug=debug_mode, - pld_spec_type_strs=pld_spec_type_strs, - add_hooks=add_codec_hooks, - started_msg_bytes=nsp_codec.encode(expected_started), - expect_ipc_send=expect_ipc_send, - ) as (ctx, first), - - ctx.open_stream() as ipc, - ): - # ensure codec is still applied across - # `tractor.Context` + its embedded nursery. - chk_codec_applied( - expect_codec=nsp_codec, - enter_value=codec, - ) - print( - 'root: ENTERING CONTEXT BLOCK\n' - f'type(first): {type(first)}\n' - f'first: {first}\n' - ) - expect_to_send.remove(first) - - # TODO: explicit values we expect depending on - # codec config! - # assert first == first_val - # assert first == f'{__name__}:ex_func' - - async for next_sent in ipc: - print( - 'Parent: child sent next value\n' - f'{next_sent}: {type(next_sent)}\n' - ) - if expect_to_send: - expect_to_send.remove(next_sent) - else: - print('PARENT should terminate stream loop + block!') - - # all sent values should have arrived! - assert not expect_to_send - - await p.cancel_actor() - - trio.run(main) - - -def chk_pld_type( - payload_spec: Type[Struct]|Any, - pld: Any, - - expect_roundtrip: bool|None = None, - -) -> bool: - - pld_val_type: Type = type(pld) - - # TODO: verify that the overridden subtypes - # DO NOT have modified type-annots from original! - # 'Start', .pld: FuncSpec - # 'StartAck', .pld: IpcCtxSpec - # 'Stop', .pld: UNSEt - # 'Error', .pld: ErrorData - - codec: MsgCodec = mk_codec( - # NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified - # type union. - ipc_pld_spec=payload_spec, - ) - - # make a one-off dec to compare with our `MsgCodec` instance - # which does the below `mk_msg_spec()` call internally - ipc_msg_spec: Union[Type[Struct]] - msg_types: list[PayloadMsg[payload_spec]] - ( - ipc_msg_spec, - msg_types, - ) = mk_msg_spec( - payload_type_union=payload_spec, - ) - _enc = msgpack.Encoder() - _dec = msgpack.Decoder( - type=ipc_msg_spec or Any, # like `PayloadMsg[Any]` - ) - - assert ( - payload_spec - == - codec.pld_spec - ) - - # assert codec.dec == dec - # - # ^-XXX-^ not sure why these aren't "equal" but when cast - # to `str` they seem to match ?? .. kk - - assert ( - str(ipc_msg_spec) - == - str(codec.msg_spec) - == - str(_dec.type) - == - str(codec.dec.type) - ) - - # verify the boxed-type for all variable payload-type msgs. - if not msg_types: - breakpoint() - - roundtrip: bool|None = None - pld_spec_msg_names: list[str] = [ - td.__name__ for td in _payload_msgs - ] - for typedef in msg_types: - - skip_runtime_msg: bool = typedef.__name__ not in pld_spec_msg_names - if skip_runtime_msg: - continue - - pld_field = structs.fields(typedef)[1] - assert pld_field.type is payload_spec # TODO-^ does this need to work to get all subtypes to adhere? - - kwargs: dict[str, Any] = { - 'cid': '666', - 'pld': pld, - } - enc_msg: PayloadMsg = typedef(**kwargs) - - _wire_bytes: bytes = _enc.encode(enc_msg) - wire_bytes: bytes = codec.enc.encode(enc_msg) - assert _wire_bytes == wire_bytes - - ve: ValidationError|None = None - try: - dec_msg = codec.dec.decode(wire_bytes) - _dec_msg = _dec.decode(wire_bytes) - - # decoded msg and thus payload should be exactly same! - assert (roundtrip := ( - _dec_msg - == - dec_msg - == - enc_msg - )) - - if ( - expect_roundtrip is not None - and expect_roundtrip != roundtrip - ): - breakpoint() - - assert ( - pld - == - dec_msg.pld - == - enc_msg.pld - ) - # assert (roundtrip := (_dec_msg == enc_msg)) - - except ValidationError as _ve: - ve = _ve - roundtrip: bool = False - if pld_val_type is payload_spec: - raise ValueError( - 'Got `ValidationError` despite type-var match!?\n' - f'pld_val_type: {pld_val_type}\n' - f'payload_type: {payload_spec}\n' - ) from ve - - else: - # ow we good cuz the pld spec mismatched. - print( - 'Got expected `ValidationError` since,\n' - f'{pld_val_type} is not {payload_spec}\n' - ) - else: - if ( - payload_spec is not Any - and - pld_val_type is not payload_spec - ): - raise ValueError( - 'DID NOT `ValidationError` despite expected type match!?\n' - f'pld_val_type: {pld_val_type}\n' - f'payload_type: {payload_spec}\n' - ) - - # full code decode should always be attempted! - if roundtrip is None: - breakpoint() - - return roundtrip - - -# ?TODO? remove since covered in the newer `test_pldrx_limiting`? -def test_limit_msgspec( - debug_mode: bool, -): - ''' - Internals unit testing to verify that type-limiting an IPC ctx's - msg spec with `Pldrx.limit_plds()` results in various - encapsulated `msgspec` object settings and state. - - ''' - async def main(): - async with tractor.open_root_actor( - debug_mode=debug_mode, - ): - # ensure we can round-trip a boxing `PayloadMsg` - assert chk_pld_type( - payload_spec=Any, - pld=None, - expect_roundtrip=True, - ) - - # verify that a mis-typed payload value won't decode - assert not chk_pld_type( - payload_spec=int, - pld='doggy', - ) - - # parametrize the boxed `.pld` type as a custom-struct - # and ensure that parametrization propagates - # to all payload-msg-spec-able subtypes! - class CustomPayload(Struct): - name: str - value: Any - - assert not chk_pld_type( - payload_spec=CustomPayload, - pld='doggy', - ) - - assert chk_pld_type( - payload_spec=CustomPayload, - pld=CustomPayload(name='doggy', value='urmom') - ) - - # yah, we can `.pause_from_sync()` now! - # breakpoint() - - trio.run(main) - - def enc_nsp(obj: Any) -> Any: actor: Actor = tractor.current_actor( err_on_no_runtime=False, @@ -1034,3 +240,816 @@ def test_custom_extension_types( except TypeError: if not add_codec_hooks: pass + +@tractor.context +async def sleep_forever_in_sub( + ctx: Context, +) -> None: + await trio.sleep_forever() + + +def mk_custom_codec( + add_hooks: bool, + +) -> tuple[ + MsgCodec, # encode to send + MsgDec, # pld receive-n-decode +]: + ''' + Create custom `msgpack` enc/dec-hooks and set a `Decoder` + which only loads `pld_spec` (like `NamespacePath`) types. + + ''' + + # XXX NOTE XXX: despite defining `NamespacePath` as a type + # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair + # to cast to/from that type on the wire. See the docs: + # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types + + # if pld_spec is Any: + # pld_spec = Raw + + nsp_codec: MsgCodec = mk_codec( + # ipc_pld_spec=Raw, # default! + + # NOTE XXX: the encode hook MUST be used no matter what since + # our `NamespacePath` is not any of a `Any` native type nor + # a `msgspec.Struct` subtype - so `msgspec` has no way to know + # how to encode it unless we provide the custom hook. + # + # AGAIN that is, regardless of whether we spec an + # `Any`-decoded-pld the enc has no knowledge (by default) + # how to enc `NamespacePath` (nsp), so we add a custom + # hook to do that ALWAYS. + enc_hook=enc_nsp if add_hooks else None, + + # XXX NOTE: pretty sure this is mutex with the `type=` to + # `Decoder`? so it won't work in tandem with the + # `ipc_pld_spec` passed above? + ext_types=[NamespacePath], + ) + # dec_hook=dec_nsp if add_hooks else None, + return nsp_codec + + +@pytest.mark.parametrize( + 'limit_plds_args', + [ + ( + {'dec_hook': None, 'ext_types': None}, + None, + ), + ( + {'dec_hook': dec_nsp, 'ext_types': None}, + TypeError, + ), + ( + {'dec_hook': dec_nsp, 'ext_types': [NamespacePath]}, + None, + ), + ( + {'dec_hook': dec_nsp, 'ext_types': [NamespacePath|None]}, + None, + ), + ], + ids=[ + 'no_hook_no_ext_types', + 'only_hook', + 'hook_and_ext_types', + 'hook_and_ext_types_w_null', + ] +) +def test_pld_limiting_usage( + limit_plds_args: tuple[dict, Exception|None], +): + ''' + Verify `dec_hook()` and `ext_types` need to either both be provided + or we raise a explanator type-error. + + ''' + kwargs, maybe_err = limit_plds_args + async def main(): + async with tractor.open_nursery() as an: # just to open runtime + + # XXX SHOULD NEVER WORK outside an ipc ctx scope! + try: + with limit_plds(**kwargs): + pass + except RuntimeError: + pass + + p: tractor.Portal = await an.start_actor( + 'sub', + enable_modules=[__name__], + ) + async with ( + p.open_context( + sleep_forever_in_sub + ) as (ctx, first), + ): + try: + with limit_plds(**kwargs): + pass + except maybe_err as exc: + assert type(exc) is maybe_err + pass + + +def chk_codec_applied( + expect_codec: MsgCodec|None, + enter_value: MsgCodec|None = None, + +) -> MsgCodec: + ''' + buncha sanity checks ensuring that the IPC channel's + context-vars are set to the expected codec and that are + ctx-var wrapper APIs match the same. + + ''' + # TODO: play with tricyle again, bc this is supposed to work + # the way we want? + # + # TreeVar + # task: trio.Task = trio.lowlevel.current_task() + # curr_codec = _ctxvar_MsgCodec.get_in(task) + + # ContextVar + # task_ctx: Context = task.context + # assert _ctxvar_MsgCodec in task_ctx + # curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec] + if expect_codec is None: + assert enter_value is None + return + + # NOTE: currently we use this! + # RunVar + curr_codec: MsgCodec = current_codec() + last_read_codec = _ctxvar_MsgCodec.get() + # assert curr_codec is last_read_codec + + assert ( + (same_codec := expect_codec) is + # returned from `mk_codec()` + + # yielded value from `apply_codec()` + + # read from current task's `contextvars.Context` + curr_codec is + last_read_codec + + # the default `msgspec` settings + is not _codec._def_msgspec_codec + is not _codec._def_tractor_codec + ) + + if enter_value: + assert enter_value is same_codec + + +# def iter_maybe_sends( +# send_items: dict[Union[Type], Any] | list[tuple], +# ipc_pld_spec: Union[Type] | Any, +# add_codec_hooks: bool, + +# codec: MsgCodec|None = None, + +# ) -> tuple[Any, bool]: + +# if isinstance(send_items, dict): +# send_items = send_items.items() + +# for ( +# send_type_spec, +# send_value, +# ) in send_items: + +# expect_roundtrip: bool = False + +# # values-to-typespec santiy +# send_type = type(send_value) +# assert send_type == send_type_spec or ( +# (subtypes := getattr(send_type_spec, '__args__', None)) +# and send_type in subtypes +# ) + +# spec_subtypes: set[Union[Type]] = ( +# getattr( +# ipc_pld_spec, +# '__args__', +# {ipc_pld_spec,}, +# ) +# ) +# send_in_spec: bool = ( +# send_type == ipc_pld_spec +# or ( +# ipc_pld_spec != Any +# and # presume `Union` of types +# send_type in spec_subtypes +# ) +# or ( +# ipc_pld_spec == Any +# and +# send_type != NamespacePath +# ) +# ) +# expect_roundtrip = ( +# send_in_spec +# # any spec should support all other +# # builtin py values that we send +# # except our custom nsp type which +# # we should be able to send as long +# # as we provide the custom codec hooks. +# or ( +# ipc_pld_spec == Any +# and +# send_type == NamespacePath +# and +# add_codec_hooks +# ) +# ) + +# if codec is not None: +# # XXX FIRST XXX ensure roundtripping works +# # before touching any IPC primitives/APIs. +# wire_bytes: bytes = codec.encode( +# Started( +# cid='blahblah', +# pld=send_value, +# ) +# ) +# # NOTE: demonstrates the decoder loading +# # to via our native SCIPP msg-spec +# # (structurred-conc-inter-proc-protocol) +# # implemented as per, +# try: +# msg: Started = codec.decode(wire_bytes) +# if not expect_roundtrip: +# pytest.fail( +# f'NOT-EXPECTED able to roundtrip value given spec:\n' +# f'ipc_pld_spec -> {ipc_pld_spec}\n' +# f'value -> {send_value}: {send_type}\n' +# ) + +# pld = msg.pld +# assert pld == send_value + +# except ValidationError: +# if expect_roundtrip: +# pytest.fail( +# f'EXPECTED to roundtrip value given spec:\n' +# f'ipc_pld_spec -> {ipc_pld_spec}\n' +# f'value -> {send_value}: {send_type}\n' +# ) + +# yield ( +# str(send_type), +# send_value, +# expect_roundtrip, +# ) + + +@tractor.context +async def send_back_values( + ctx: Context, + rent_pld_spec_type_strs: list[str], + add_hooks: bool, + # expect_ipc_send: dict[str, tuple[Any, bool]], + + # expect_debug: bool, + # started_msg_bytes: bytes, + +) -> None: + ''' + Setup up a custom codec to load instances of `NamespacePath` + and ensure we can round trip a func ref with our parent. + + ''' + uid: tuple = tractor.current_actor().uid + + # init state in sub-actor should be default + chk_codec_applied( + expect_codec=_codec._def_tractor_codec, + ) + + # load pld spec from input str + rent_pld_spec = _exts.dec_type_union( + rent_pld_spec_type_strs, + mods=[ + importlib.import_module(__name__), + ], + ) + rent_pld_spec_types: set[Type] = _codec.unpack_spec_types( + rent_pld_spec, + ) + + # ONLY add ext-hooks if the rent specified a non-std type! + add_hooks: bool = ( + NamespacePath in rent_pld_spec_types + ) + + # same as on parent side config. + nsp_codec: MsgCodec|None = None + if add_hooks: + nsp_codec = mk_codec( + enc_hook=enc_nsp, + ext_types=[NamespacePath], + ) + + with ( + maybe_apply_codec(nsp_codec) as codec, + limit_plds( + rent_pld_spec, + dec_hook=dec_nsp if add_hooks else None, + ext_types=[NamespacePath] if add_hooks else None, + ) as pld_dec, + ): + # ?XXX? SHOULD WE NOT be swapping the global codec since it + # breaks `Context.started()` roundtripping checks?? + chk_codec_applied( + expect_codec=nsp_codec, + enter_value=codec, + ) + + # ?TODO, mismatch case(s)? + # + # ensure pld spec matches on both sides + ctx_pld_dec: MsgDec = ctx._pld_rx._pld_dec + assert pld_dec is ctx_pld_dec + child_pld_spec: Type = pld_dec.spec + child_pld_spec_types: set[Type] = _codec.unpack_spec_types( + child_pld_spec, + ) + assert ( + # child_pld_spec == rent_pld_spec + child_pld_spec_types.issuperset( + rent_pld_spec_types + ) + ) + + # expected_pld_spec_str: str = str(ipc_spec) + # assert ( + # pld_spec_str == expected_pld_spec_str + # and + # ipc_pld_spec == ipc_spec + # ) + + # ?TODO, try loop for each of the types in pld-superset? + # + # for send_value in [ + # nsp, + # str(nsp), + # None, + # ]: + nsp = NamespacePath.from_ref(ex_func) + try: + print( + f'{uid}: attempting to `.started({nsp})`\n' + f'\n' + f'rent_pld_spec: {rent_pld_spec}\n' + f'child_pld_spec: {child_pld_spec}\n' + f'codec: {codec}\n' + ) + await ctx.started(nsp) + + except tractor.MsgTypeError as _mte: + mte = _mte + + # false -ve case + if add_hooks: + raise RuntimeError( + f'EXPECTED to `.started()` value given spec ??\n\n' + f'child_pld_spec -> {child_pld_spec}\n' + f'value = {nsp}: {type(nsp)}\n' + ) + + # true -ve case + raise mte + + # TODO: maybe we should add our own wrapper error so as to + # be interchange-lib agnostic? + # -[ ] the error type is wtv is raised from the hook so we + # could also require a type-class of errors for + # indicating whether the hook-failure can be handled by + # a nasty-dialog-unprot sub-sys? + except TypeError as typerr: + # false -ve + if add_hooks: + raise RuntimeError('Should have been able to send `nsp`??') + + # true -ve + print('Failed to send `nsp` due to no ext hooks set!') + raise typerr + + # now try sending a set of valid and invalid plds to ensure + # the pld spec is respected. + sent: list[Any] = [] + async with ctx.open_stream() as ipc: + print( + f'{uid}: streaming all pld types to rent..' + ) + + # for send_value, expect_send in iter_send_val_items: + for send_value in [ + nsp, + str(nsp), + None, + ]: + send_type: Type = type(send_value) + print( + f'{uid}: SENDING NEXT pld\n' + f'send_type: {send_type}\n' + f'send_value: {send_value}\n' + ) + try: + await ipc.send(send_value) + sent.append(send_value) + + except ValidationError as valerr: + print(f'{uid} FAILED TO SEND {send_value}!') + + # false -ve + if add_hooks: + raise RuntimeError( + f'EXPECTED to roundtrip value given spec:\n' + f'rent_pld_spec -> {rent_pld_spec}\n' + f'child_pld_spec -> {child_pld_spec}\n' + f'value = {send_value}: {send_type}\n' + ) + + # true -ve + raise valerr + # continue + + else: + print( + f'{uid}: finished sending all values\n' + 'Should be exiting stream block!\n' + ) + + print(f'{uid}: exited streaming block!') + + + +@cm +def maybe_apply_codec(codec: MsgCodec|None) -> MsgCodec|None: + if codec is None: + yield None + return + + with apply_codec(codec) as codec: + yield codec + + +@pytest.mark.parametrize( + 'pld_spec', + [ + Any, + NamespacePath, + NamespacePath|None, # the "maybe" spec Bo + ], + ids=[ + 'any_type', + 'only_nsp_ext', + 'maybe_nsp_ext', + ] +) +@pytest.mark.parametrize( + 'add_hooks', + [ + True, + False, + ], + ids=[ + 'use_codec_hooks', + 'no_codec_hooks', + ], +) +def test_ext_types_over_ipc( + debug_mode: bool, + pld_spec: Union[Type], + add_hooks: bool, +): + ''' + Ensure we can support extension types coverted using + `enc/dec_hook()`s passed to the `.msg.limit_plds()` API + and that sane errors happen when we try do the same without + the codec hooks. + + ''' + pld_types: set[Type] = _codec.unpack_spec_types(pld_spec) + + async def main(): + + # sanity check the default pld-spec beforehand + chk_codec_applied( + expect_codec=_codec._def_tractor_codec, + ) + + # extension type we want to send as msg payload + nsp = NamespacePath.from_ref(ex_func) + + # ^NOTE, 2 cases: + # - codec hooks noto added -> decode nsp as `str` + # - codec with hooks -> decode nsp as `NamespacePath` + nsp_codec: MsgCodec|None = None + if ( + NamespacePath in pld_types + and + add_hooks + ): + nsp_codec = mk_codec( + enc_hook=enc_nsp, + ext_types=[NamespacePath], + ) + + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as an: + p: tractor.Portal = await an.start_actor( + 'sub', + enable_modules=[__name__], + ) + with ( + maybe_apply_codec(nsp_codec) as codec, + ): + chk_codec_applied( + expect_codec=nsp_codec, + enter_value=codec, + ) + rent_pld_spec_type_strs: list[str] = _exts.enc_type_union(pld_spec) + + # XXX should raise an mte (`MsgTypeError`) + # when `add_hooks == False` bc the input + # `expect_ipc_send` kwarg has a nsp which can't be + # serialized! + # + # TODO:can we ensure this happens from the + # `Return`-side (aka the sub) as well? + try: + ctx: tractor.Context + ipc: tractor.MsgStream + async with ( + + # XXX should raise an mte (`MsgTypeError`) + # when `add_hooks == False`.. + p.open_context( + send_back_values, + # expect_debug=debug_mode, + rent_pld_spec_type_strs=rent_pld_spec_type_strs, + add_hooks=add_hooks, + # expect_ipc_send=expect_ipc_send, + ) as (ctx, first), + + ctx.open_stream() as ipc, + ): + with ( + limit_plds( + pld_spec, + dec_hook=dec_nsp if add_hooks else None, + ext_types=[NamespacePath] if add_hooks else None, + ) as pld_dec, + ): + ctx_pld_dec: MsgDec = ctx._pld_rx._pld_dec + assert pld_dec is ctx_pld_dec + + # if ( + # not add_hooks + # and + # NamespacePath in + # ): + # pytest.fail('ctx should fail to open without custom enc_hook!?') + + await ipc.send(nsp) + nsp_rt = await ipc.receive() + + assert nsp_rt == nsp + assert nsp_rt.load_ref() is ex_func + + # this test passes bc we can go no further! + except MsgTypeError: + if not add_hooks: + # teardown nursery + await p.cancel_actor() + return + + await p.cancel_actor() + + if ( + NamespacePath in pld_types + and + add_hooks + ): + trio.run(main) + + else: + with pytest.raises( + expected_exception=tractor.RemoteActorError, + ) as excinfo: + trio.run(main) + + exc = excinfo.value + # bc `.started(nsp: NamespacePath)` will raise + assert exc.boxed_type is TypeError + + +# def chk_pld_type( +# payload_spec: Type[Struct]|Any, +# pld: Any, + +# expect_roundtrip: bool|None = None, + +# ) -> bool: + +# pld_val_type: Type = type(pld) + +# # TODO: verify that the overridden subtypes +# # DO NOT have modified type-annots from original! +# # 'Start', .pld: FuncSpec +# # 'StartAck', .pld: IpcCtxSpec +# # 'Stop', .pld: UNSEt +# # 'Error', .pld: ErrorData + +# codec: MsgCodec = mk_codec( +# # NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified +# # type union. +# ipc_pld_spec=payload_spec, +# ) + +# # make a one-off dec to compare with our `MsgCodec` instance +# # which does the below `mk_msg_spec()` call internally +# ipc_msg_spec: Union[Type[Struct]] +# msg_types: list[PayloadMsg[payload_spec]] +# ( +# ipc_msg_spec, +# msg_types, +# ) = mk_msg_spec( +# payload_type_union=payload_spec, +# ) +# _enc = msgpack.Encoder() +# _dec = msgpack.Decoder( +# type=ipc_msg_spec or Any, # like `PayloadMsg[Any]` +# ) + +# assert ( +# payload_spec +# == +# codec.pld_spec +# ) + +# # assert codec.dec == dec +# # +# # ^-XXX-^ not sure why these aren't "equal" but when cast +# # to `str` they seem to match ?? .. kk + +# assert ( +# str(ipc_msg_spec) +# == +# str(codec.msg_spec) +# == +# str(_dec.type) +# == +# str(codec.dec.type) +# ) + +# # verify the boxed-type for all variable payload-type msgs. +# if not msg_types: +# breakpoint() + +# roundtrip: bool|None = None +# pld_spec_msg_names: list[str] = [ +# td.__name__ for td in _payload_msgs +# ] +# for typedef in msg_types: + +# skip_runtime_msg: bool = typedef.__name__ not in pld_spec_msg_names +# if skip_runtime_msg: +# continue + +# pld_field = structs.fields(typedef)[1] +# assert pld_field.type is payload_spec # TODO-^ does this need to work to get all subtypes to adhere? + +# kwargs: dict[str, Any] = { +# 'cid': '666', +# 'pld': pld, +# } +# enc_msg: PayloadMsg = typedef(**kwargs) + +# _wire_bytes: bytes = _enc.encode(enc_msg) +# wire_bytes: bytes = codec.enc.encode(enc_msg) +# assert _wire_bytes == wire_bytes + +# ve: ValidationError|None = None +# try: +# dec_msg = codec.dec.decode(wire_bytes) +# _dec_msg = _dec.decode(wire_bytes) + +# # decoded msg and thus payload should be exactly same! +# assert (roundtrip := ( +# _dec_msg +# == +# dec_msg +# == +# enc_msg +# )) + +# if ( +# expect_roundtrip is not None +# and expect_roundtrip != roundtrip +# ): +# breakpoint() + +# assert ( +# pld +# == +# dec_msg.pld +# == +# enc_msg.pld +# ) +# # assert (roundtrip := (_dec_msg == enc_msg)) + +# except ValidationError as _ve: +# ve = _ve +# roundtrip: bool = False +# if pld_val_type is payload_spec: +# raise ValueError( +# 'Got `ValidationError` despite type-var match!?\n' +# f'pld_val_type: {pld_val_type}\n' +# f'payload_type: {payload_spec}\n' +# ) from ve + +# else: +# # ow we good cuz the pld spec mismatched. +# print( +# 'Got expected `ValidationError` since,\n' +# f'{pld_val_type} is not {payload_spec}\n' +# ) +# else: +# if ( +# payload_spec is not Any +# and +# pld_val_type is not payload_spec +# ): +# raise ValueError( +# 'DID NOT `ValidationError` despite expected type match!?\n' +# f'pld_val_type: {pld_val_type}\n' +# f'payload_type: {payload_spec}\n' +# ) + +# # full code decode should always be attempted! +# if roundtrip is None: +# breakpoint() + +# return roundtrip + + +# ?TODO? maybe remove since covered in the newer `test_pldrx_limiting` +# via end-2-end testing of all this? +# -[ ] IOW do we really NEED this lowlevel unit testing? +# +# def test_limit_msgspec( +# debug_mode: bool, +# ): +# ''' +# Internals unit testing to verify that type-limiting an IPC ctx's +# msg spec with `Pldrx.limit_plds()` results in various +# encapsulated `msgspec` object settings and state. + +# ''' +# async def main(): +# async with tractor.open_root_actor( +# debug_mode=debug_mode, +# ): +# # ensure we can round-trip a boxing `PayloadMsg` +# assert chk_pld_type( +# payload_spec=Any, +# pld=None, +# expect_roundtrip=True, +# ) + +# # verify that a mis-typed payload value won't decode +# assert not chk_pld_type( +# payload_spec=int, +# pld='doggy', +# ) + +# # parametrize the boxed `.pld` type as a custom-struct +# # and ensure that parametrization propagates +# # to all payload-msg-spec-able subtypes! +# class CustomPayload(Struct): +# name: str +# value: Any + +# assert not chk_pld_type( +# payload_spec=CustomPayload, +# pld='doggy', +# ) + +# assert chk_pld_type( +# payload_spec=CustomPayload, +# pld=CustomPayload(name='doggy', value='urmom') +# ) + +# # yah, we can `.pause_from_sync()` now! +# # breakpoint() + +# trio.run(main) -- 2.34.1 From 80fa61af61aa62363028509decb300fb09766b12 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Mar 2025 23:58:31 -0500 Subject: [PATCH 07/19] Facepalm, fix logic misstep on child side Namely that `add_hooks: bool` should be the same as on the rent side.. Also, just drop the now unused `iter_maybe_sends`. This makes the suite entire greeeeen btw, including the new sub-suite which i hadn't runt before Bo --- tests/test_caps_based_msging.py | 133 +++----------------------------- 1 file changed, 12 insertions(+), 121 deletions(-) diff --git a/tests/test_caps_based_msging.py b/tests/test_caps_based_msging.py index 4d78a117..b334b64f 100644 --- a/tests/test_caps_based_msging.py +++ b/tests/test_caps_based_msging.py @@ -323,8 +323,8 @@ def test_pld_limiting_usage( limit_plds_args: tuple[dict, Exception|None], ): ''' - Verify `dec_hook()` and `ext_types` need to either both be provided - or we raise a explanator type-error. + Verify `dec_hook()` and `ext_types` need to either both be + provided or we raise a explanator type-error. ''' kwargs, maybe_err = limit_plds_args @@ -406,117 +406,11 @@ def chk_codec_applied( assert enter_value is same_codec -# def iter_maybe_sends( -# send_items: dict[Union[Type], Any] | list[tuple], -# ipc_pld_spec: Union[Type] | Any, -# add_codec_hooks: bool, - -# codec: MsgCodec|None = None, - -# ) -> tuple[Any, bool]: - -# if isinstance(send_items, dict): -# send_items = send_items.items() - -# for ( -# send_type_spec, -# send_value, -# ) in send_items: - -# expect_roundtrip: bool = False - -# # values-to-typespec santiy -# send_type = type(send_value) -# assert send_type == send_type_spec or ( -# (subtypes := getattr(send_type_spec, '__args__', None)) -# and send_type in subtypes -# ) - -# spec_subtypes: set[Union[Type]] = ( -# getattr( -# ipc_pld_spec, -# '__args__', -# {ipc_pld_spec,}, -# ) -# ) -# send_in_spec: bool = ( -# send_type == ipc_pld_spec -# or ( -# ipc_pld_spec != Any -# and # presume `Union` of types -# send_type in spec_subtypes -# ) -# or ( -# ipc_pld_spec == Any -# and -# send_type != NamespacePath -# ) -# ) -# expect_roundtrip = ( -# send_in_spec -# # any spec should support all other -# # builtin py values that we send -# # except our custom nsp type which -# # we should be able to send as long -# # as we provide the custom codec hooks. -# or ( -# ipc_pld_spec == Any -# and -# send_type == NamespacePath -# and -# add_codec_hooks -# ) -# ) - -# if codec is not None: -# # XXX FIRST XXX ensure roundtripping works -# # before touching any IPC primitives/APIs. -# wire_bytes: bytes = codec.encode( -# Started( -# cid='blahblah', -# pld=send_value, -# ) -# ) -# # NOTE: demonstrates the decoder loading -# # to via our native SCIPP msg-spec -# # (structurred-conc-inter-proc-protocol) -# # implemented as per, -# try: -# msg: Started = codec.decode(wire_bytes) -# if not expect_roundtrip: -# pytest.fail( -# f'NOT-EXPECTED able to roundtrip value given spec:\n' -# f'ipc_pld_spec -> {ipc_pld_spec}\n' -# f'value -> {send_value}: {send_type}\n' -# ) - -# pld = msg.pld -# assert pld == send_value - -# except ValidationError: -# if expect_roundtrip: -# pytest.fail( -# f'EXPECTED to roundtrip value given spec:\n' -# f'ipc_pld_spec -> {ipc_pld_spec}\n' -# f'value -> {send_value}: {send_type}\n' -# ) - -# yield ( -# str(send_type), -# send_value, -# expect_roundtrip, -# ) - - @tractor.context async def send_back_values( ctx: Context, rent_pld_spec_type_strs: list[str], add_hooks: bool, - # expect_ipc_send: dict[str, tuple[Any, bool]], - - # expect_debug: bool, - # started_msg_bytes: bytes, ) -> None: ''' @@ -545,6 +439,8 @@ async def send_back_values( # ONLY add ext-hooks if the rent specified a non-std type! add_hooks: bool = ( NamespacePath in rent_pld_spec_types + and + add_hooks ) # same as on parent side config. @@ -580,19 +476,11 @@ async def send_back_values( child_pld_spec, ) assert ( - # child_pld_spec == rent_pld_spec child_pld_spec_types.issuperset( rent_pld_spec_types ) ) - # expected_pld_spec_str: str = str(ipc_spec) - # assert ( - # pld_spec_str == expected_pld_spec_str - # and - # ipc_pld_spec == ipc_spec - # ) - # ?TODO, try loop for each of the types in pld-superset? # # for send_value in [ @@ -609,6 +497,7 @@ async def send_back_values( f'child_pld_spec: {child_pld_spec}\n' f'codec: {codec}\n' ) + # await tractor.pause() await ctx.started(nsp) except tractor.MsgTypeError as _mte: @@ -826,11 +715,13 @@ def test_ext_types_over_ipc( assert nsp_rt.load_ref() is ex_func # this test passes bc we can go no further! - except MsgTypeError: - if not add_hooks: - # teardown nursery - await p.cancel_actor() - return + except MsgTypeError as mte: + # if not add_hooks: + # # teardown nursery + # await p.cancel_actor() + # return + + raise mte await p.cancel_actor() -- 2.34.1 From 305e1d03474935fd2dd42f71dbb1efe6403fca72 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Mar 2025 11:17:46 -0400 Subject: [PATCH 08/19] Avoid attr-err when `._ipc_msg==None` Seems this can happen in particular when we raise a `MessageTypeError` on the sender side of a `Context`, since there isn't any msg relayed from the other side (though i'm wondering if MTE should derive from RAE then considering this case?). Means `RemoteActorError.boxed_type = None` in such cases instead of raising an attr-error for the `None.boxed_type_str`. --- tractor/_exceptions.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 249ea164..f9e18e18 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -432,9 +432,13 @@ class RemoteActorError(Exception): Error type boxed by last actor IPC hop. ''' - if self._boxed_type is None: + if ( + self._boxed_type is None + and + (ipc_msg := self._ipc_msg) + ): self._boxed_type = get_err_type( - self._ipc_msg.boxed_type_str + ipc_msg.boxed_type_str ) return self._boxed_type @@ -1143,6 +1147,8 @@ def unpack_error( which is the responsibilitiy of the caller. ''' + # XXX, apparently we pass all sorts of msgs here? + # kinda odd but seems like maybe they shouldn't be? if not isinstance(msg, Error): return None -- 2.34.1 From f3b10a8032d6629e416a66c7590c1888edb5632a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Dec 2024 22:22:26 -0500 Subject: [PATCH 09/19] Support `ctx: UnionType` annots for `@tractor.context` eps --- tractor/_context.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tractor/_context.py b/tractor/_context.py index 4628b11f..5d6ccf69 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -47,6 +47,9 @@ from functools import partial import inspect from pprint import pformat import textwrap +from types import ( + UnionType, +) from typing import ( Any, AsyncGenerator, @@ -2548,7 +2551,14 @@ def context( name: str param: Type for name, param in annots.items(): - if param is Context: + if ( + param is Context + or ( + isinstance(param, UnionType) + and + Context in param.__args__ + ) + ): ctx_var_name: str = name break else: -- 2.34.1 From 3e1d18497f692a74c484f952ebbb00d950c92519 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Mar 2025 18:17:31 -0400 Subject: [PATCH 10/19] Deliver a `MaybeBoxedError` from `.expect_ctxc()` Just like we do from the `.devx._debug.open_crash_handler()`, this allows checking various attrs on the raised `ContextCancelled` much like `with pytest.raises() as excinfo:`. --- tractor/_testing/__init__.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tractor/_testing/__init__.py b/tractor/_testing/__init__.py index 43507c33..88860d13 100644 --- a/tractor/_testing/__init__.py +++ b/tractor/_testing/__init__.py @@ -26,6 +26,9 @@ import os import pathlib import tractor +from tractor.devx._debug import ( + BoxedMaybeException, +) from .pytest import ( tractor_test as tractor_test ) @@ -98,12 +101,13 @@ async def expect_ctxc( ''' if yay: try: - yield + yield (maybe_exc := BoxedMaybeException()) raise RuntimeError('Never raised ctxc?') - except tractor.ContextCancelled: + except tractor.ContextCancelled as ctxc: + maybe_exc.value = ctxc if reraise: raise else: return else: - yield + yield (maybe_exc := BoxedMaybeException()) -- 2.34.1 From 4e5742a0567c0f57ebab90306e37eae6c832d134 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Mar 2025 14:04:55 -0400 Subject: [PATCH 11/19] Extend ctx semantics suite for streaming edge cases! Muchas grax to @guilledk for finding the first issue which kicked of this further scrutiny of the `tractor.Context` and `MsgStream` semantics test suite with a strange edge case where, - if the parent opened and immediately closed a stream while the remote child task started and continued (without terminating) to send msgs the parent's `open_context().__aexit__()` would **not block** on the child to complete! => this was seemingly due to a bug discovered inside the `.msg._ops.drain_to_final_msg()` stream handling case logic where we are NOT checking if `Context._stream` is non-`None`! As such this, - extends the `test_caller_closes_ctx_after_callee_opens_stream` (now renamed, see below) to include cases for all combinations of the child and parent sending before receiving on the stream as well as all placements of `Context.cancel()` in the parent before, around and after the stream open. - uses the new `expect_ctxc()` for expecting the taskc (`trio.Task` cancelled)` cases. - also extends the `test_callee_closes_ctx_after_stream_open` (also renamed) to include the case where the parent sends a msg before it receives. => this case has unveiled yet-another-bug where somehow the underlying `MsgStream._rx_chan: trio.ReceiveMemoryChannel` is allowing the child's `Return[None]` msg be consumed and NOT in a place where it is correctly set as `Context._result` resulting in the parent hanging forever inside `._ops.drain_to_final_msg()`.. Alongside, - start renaming using the new "remote-task-peer-side" semantics throughout the test module: "caller" -> "parent", "callee" -> "child". --- tests/test_context_stream_semantics.py | 158 ++++++++++++++++++------- 1 file changed, 117 insertions(+), 41 deletions(-) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index ade275aa..29e99b2e 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -443,7 +443,6 @@ def test_caller_cancels( @tractor.context async def close_ctx_immediately( - ctx: Context, ) -> None: @@ -454,10 +453,21 @@ async def close_ctx_immediately( async with ctx.open_stream(): pass + print('child returning!') + +@pytest.mark.parametrize( + 'parent_send_before_receive', + [ + False, + True, + ], + ids=lambda item: f'child_send_before_receive={item}' +) @tractor_test -async def test_callee_closes_ctx_after_stream_open( +async def test_child_exits_ctx_after_stream_open( debug_mode: bool, + parent_send_before_receive: bool, ): ''' callee context closes without using stream. @@ -474,6 +484,15 @@ async def test_callee_closes_ctx_after_stream_open( => {'stop': True, 'cid': } ''' + timeout: float = ( + 0.5 if ( + not debug_mode + # NOTE, for debugging final + # Return-consumed-n-discarded-ishue! + # and + # not parent_send_before_receive + ) else 999 + ) async with tractor.open_nursery( debug_mode=debug_mode, ) as an: @@ -482,7 +501,7 @@ async def test_callee_closes_ctx_after_stream_open( enable_modules=[__name__], ) - with trio.fail_after(0.5): + with trio.fail_after(timeout): async with portal.open_context( close_ctx_immediately, @@ -494,41 +513,56 @@ async def test_callee_closes_ctx_after_stream_open( with trio.fail_after(0.4): async with ctx.open_stream() as stream: + if parent_send_before_receive: + print('sending first msg from parent!') + await stream.send('yo') # should fall through since ``StopAsyncIteration`` # should be raised through translation of # a ``trio.EndOfChannel`` by # ``trio.abc.ReceiveChannel.__anext__()`` - async for _ in stream: + msg = 10 + async for msg in stream: # trigger failure if we DO NOT # get an EOC! assert 0 else: + # never should get anythinig new from + # the underlying stream + assert msg == 10 # verify stream is now closed try: with trio.fail_after(0.3): + print('parent trying to `.receive()` on EoC stream!') await stream.receive() + assert 0, 'should have raised eoc!?' except trio.EndOfChannel: + print('parent got EoC as expected!') pass + # raise # TODO: should be just raise the closed resource err # directly here to enforce not allowing a re-open # of a stream to the context (at least until a time of # if/when we decide that's a good idea?) try: - with trio.fail_after(0.5): + with trio.fail_after(timeout): async with ctx.open_stream() as stream: pass except trio.ClosedResourceError: pass + # if ctx._rx_chan._state.data: + # await tractor.pause() + await portal.cancel_actor() @tractor.context async def expect_cancelled( ctx: Context, + send_before_receive: bool = False, ) -> None: global _state @@ -538,6 +572,10 @@ async def expect_cancelled( try: async with ctx.open_stream() as stream: + + if send_before_receive: + await stream.send('yo') + async for msg in stream: await stream.send(msg) # echo server @@ -567,23 +605,46 @@ async def expect_cancelled( assert 0, "callee wasn't cancelled !?" +@pytest.mark.parametrize( + 'child_send_before_receive', + [ + False, + True, + ], + ids=lambda item: f'child_send_before_receive={item}' +) +@pytest.mark.parametrize( + 'rent_wait_for_msg', + [ + False, + True, + ], + ids=lambda item: f'rent_wait_for_msg={item}' +) @pytest.mark.parametrize( 'use_ctx_cancel_method', - [False, True], + [ + False, + 'pre_stream', + 'post_stream_open', + 'post_stream_close', + ], + ids=lambda item: f'use_ctx_cancel_method={item}' ) @tractor_test -async def test_caller_closes_ctx_after_callee_opens_stream( - use_ctx_cancel_method: bool, +async def test_parent_exits_ctx_after_child_enters_stream( + use_ctx_cancel_method: bool|str, debug_mode: bool, + rent_wait_for_msg: bool, + child_send_before_receive: bool, ): ''' - caller context closes without using/opening stream + Parent-side of IPC context closes without sending on `MsgStream`. ''' async with tractor.open_nursery( debug_mode=debug_mode, ) as an: - root: Actor = current_actor() portal = await an.start_actor( 'ctx_cancelled', @@ -592,41 +653,52 @@ async def test_caller_closes_ctx_after_callee_opens_stream( async with portal.open_context( expect_cancelled, + send_before_receive=child_send_before_receive, ) as (ctx, sent): assert sent is None await portal.run(assert_state, value=True) # call `ctx.cancel()` explicitly - if use_ctx_cancel_method: + if use_ctx_cancel_method == 'pre_stream': await ctx.cancel() # NOTE: means the local side `ctx._scope` will # have been cancelled by an ctxc ack and thus # `._scope.cancelled_caught` should be set. - try: + async with ( + expect_ctxc( + # XXX: the cause is US since we call + # `Context.cancel()` just above! + yay=True, + + # XXX: must be propagated to __aexit__ + # and should be silently absorbed there + # since we called `.cancel()` just above ;) + reraise=True, + ) as maybe_ctxc, + ): async with ctx.open_stream() as stream: - async for msg in stream: - pass - except tractor.ContextCancelled as ctxc: - # XXX: the cause is US since we call - # `Context.cancel()` just above! - assert ( - ctxc.canceller - == - current_actor().uid - == - root.uid - ) + if rent_wait_for_msg: + async for msg in stream: + print(f'PARENT rx: {msg!r}\n') + break - # XXX: must be propagated to __aexit__ - # and should be silently absorbed there - # since we called `.cancel()` just above ;) - raise + if use_ctx_cancel_method == 'post_stream_open': + await ctx.cancel() - else: - assert 0, "Should have context cancelled?" + if use_ctx_cancel_method == 'post_stream_close': + await ctx.cancel() + + ctxc: tractor.ContextCancelled = maybe_ctxc.value + assert ( + ctxc.canceller + == + current_actor().uid + == + root.uid + ) # channel should still be up assert portal.channel.connected() @@ -637,13 +709,20 @@ async def test_caller_closes_ctx_after_callee_opens_stream( value=False, ) + # XXX CHILD-BLOCKS case, we SHOULD NOT exit from the + # `.open_context()` before the child has returned, + # errored or been cancelled! else: try: - with trio.fail_after(0.2): - await ctx.result() + with trio.fail_after( + 0.5 # if not debug_mode else 999 + ): + res = await ctx.wait_for_result() + assert res is not tractor._context.Unresolved assert 0, "Callee should have blocked!?" except trio.TooSlowError: - # NO-OP -> since already called above + # NO-OP -> since already triggered by + # `trio.fail_after()` above! await ctx.cancel() # NOTE: local scope should have absorbed the cancellation since @@ -683,7 +762,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream( @tractor_test -async def test_multitask_caller_cancels_from_nonroot_task( +async def test_multitask_parent_cancels_from_nonroot_task( debug_mode: bool, ): async with tractor.open_nursery( @@ -735,7 +814,6 @@ async def test_multitask_caller_cancels_from_nonroot_task( @tractor.context async def cancel_self( - ctx: Context, ) -> None: @@ -775,7 +853,7 @@ async def cancel_self( @tractor_test -async def test_callee_cancels_before_started( +async def test_child_cancels_before_started( debug_mode: bool, ): ''' @@ -826,8 +904,7 @@ async def never_open_stream( @tractor.context -async def keep_sending_from_callee( - +async def keep_sending_from_child( ctx: Context, msg_buffer_size: int|None = None, @@ -850,7 +927,7 @@ async def keep_sending_from_callee( 'overrun_by', [ ('caller', 1, never_open_stream), - ('callee', 0, keep_sending_from_callee), + ('callee', 0, keep_sending_from_child), ], ids=[ ('caller_1buf_never_open_stream'), @@ -931,8 +1008,7 @@ def test_one_end_stream_not_opened( @tractor.context async def echo_back_sequence( - - ctx: Context, + ctx: Context, seq: list[int], wait_for_cancel: bool, allow_overruns_side: str, -- 2.34.1 From 99958cc7278ca279dd4eb58960f9ee00726885e6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Mar 2025 14:31:53 -0400 Subject: [PATCH 12/19] Fix msg-draining on `parent_never_opened_stream`! Repairs a bug in `drain_to_final_msg()` where in the `Yield()` case block we weren't guarding against the `ctx._stream is None` edge case which should be treated a `continue`-draining (not a `break` or attr-error!!) situation since the peer task maybe be continuing to send `Yield` but has not yet sent an outcome msg (one of `Return/Error/ContextCancelled`) to terminate the loop. Ensure we explicitly warn about this case as well as `.cancel()` emit on a taskc. Thanks again to @guille for discovering this! Also add temporary `.info()`s around rxed `Return` msgs as part of trying to debug a different bug discovered while updating the context-semantics test suite (in a prior commit). --- tractor/msg/_ops.py | 84 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 20 deletions(-) diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 839be532..5f4b9fe8 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -186,10 +186,16 @@ class PldRx(Struct): msg: MsgType = ( ipc_msg or - # sync-rx msg from underlying IPC feeder (mem-)chan ipc._rx_chan.receive_nowait() ) + if ( + type(msg) is Return + ): + log.info( + f'Rxed final result msg\n' + f'{msg}\n' + ) return self.decode_pld( msg, ipc=ipc, @@ -219,6 +225,13 @@ class PldRx(Struct): # async-rx msg from underlying IPC feeder (mem-)chan await ipc._rx_chan.receive() ) + if ( + type(msg) is Return + ): + log.info( + f'Rxed final result msg\n' + f'{msg}\n' + ) return self.decode_pld( msg=msg, ipc=ipc, @@ -407,8 +420,6 @@ class PldRx(Struct): __tracebackhide__: bool = False raise - dec_msg = decode_pld - async def recv_msg_w_pld( self, ipc: Context|MsgStream, @@ -422,12 +433,19 @@ class PldRx(Struct): ) -> tuple[MsgType, PayloadT]: ''' - Retrieve the next avail IPC msg, decode it's payload, and return - the pair of refs. + Retrieve the next avail IPC msg, decode it's payload, and + return the pair of refs. ''' __tracebackhide__: bool = hide_tb msg: MsgType = await ipc._rx_chan.receive() + if ( + type(msg) is Return + ): + log.info( + f'Rxed final result msg\n' + f'{msg}\n' + ) if passthrough_non_pld_msgs: match msg: @@ -444,6 +462,10 @@ class PldRx(Struct): hide_tb=hide_tb, **kwargs, ) + # log.runtime( + # f'Delivering payload msg\n' + # f'{msg}\n' + # ) return msg, pld @@ -538,8 +560,8 @@ async def maybe_limit_plds( async def drain_to_final_msg( ctx: Context, - hide_tb: bool = True, msg_limit: int = 6, + hide_tb: bool = True, ) -> tuple[ Return|None, @@ -568,8 +590,8 @@ async def drain_to_final_msg( even after ctx closure and the `.open_context()` block exit. ''' - __tracebackhide__: bool = hide_tb raise_overrun: bool = not ctx._allow_overruns + parent_never_opened_stream: bool = ctx._stream is None # wait for a final context result by collecting (but # basically ignoring) any bi-dir-stream msgs still in transit @@ -578,7 +600,8 @@ async def drain_to_final_msg( result_msg: Return|Error|None = None while not ( ctx.maybe_error - and not ctx._final_result_is_set() + and + not ctx._final_result_is_set() ): try: # receive all msgs, scanning for either a final result @@ -631,6 +654,11 @@ async def drain_to_final_msg( ) __tracebackhide__: bool = False + else: + log.cancel( + f'IPC ctx cancelled externally during result drain ?\n' + f'{ctx}' + ) # CASE 2: mask the local cancelled-error(s) # only when we are sure the remote error is # the source cause of this local task's @@ -662,17 +690,24 @@ async def drain_to_final_msg( case Yield(): pre_result_drained.append(msg) if ( - (ctx._stream.closed - and (reason := 'stream was already closed') - ) - or (ctx.cancel_acked - and (reason := 'ctx cancelled other side') - ) - or (ctx._cancel_called - and (reason := 'ctx called `.cancel()`') - ) - or (len(pre_result_drained) > msg_limit - and (reason := f'"yield" limit={msg_limit}') + not parent_never_opened_stream + and ( + (ctx._stream.closed + and + (reason := 'stream was already closed') + ) or + (ctx.cancel_acked + and + (reason := 'ctx cancelled other side') + ) + or (ctx._cancel_called + and + (reason := 'ctx called `.cancel()`') + ) + or (len(pre_result_drained) > msg_limit + and + (reason := f'"yield" limit={msg_limit}') + ) ) ): log.cancel( @@ -690,7 +725,7 @@ async def drain_to_final_msg( # drain up to the `msg_limit` hoping to get # a final result or error/ctxc. else: - log.warning( + report: str = ( 'Ignoring "yield" msg during `ctx.result()` drain..\n' f'<= {ctx.chan.uid}\n' f' |_{ctx._nsf}()\n\n' @@ -699,6 +734,14 @@ async def drain_to_final_msg( f'{pretty_struct.pformat(msg)}\n' ) + if parent_never_opened_stream: + report = ( + f'IPC ctx never opened stream on {ctx.side!r}-side!\n' + f'\n' + # f'{ctx}\n' + ) + report + + log.warning(report) continue # stream terminated, but no result yet.. @@ -790,6 +833,7 @@ async def drain_to_final_msg( f'{ctx.outcome}\n' ) + __tracebackhide__: bool = hide_tb return ( result_msg, pre_result_drained, -- 2.34.1 From 9b89d79ef65d8ed4d32141a06fc29430b29d79f5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Mar 2025 13:15:48 -0400 Subject: [PATCH 13/19] Complete rename to parent->child IPC ctx peers Now changed in all comments docs **and** test-code content such that we aren't using the "caller"->"callee" semantics anymore. --- tests/test_context_stream_semantics.py | 105 ++++++++++++------------- 1 file changed, 49 insertions(+), 56 deletions(-) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index 29e99b2e..14cb9cc6 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -38,9 +38,9 @@ from tractor._testing import ( # - standard setup/teardown: # ``Portal.open_context()`` starts a new # remote task context in another actor. The target actor's task must -# call ``Context.started()`` to unblock this entry on the caller side. -# the callee task executes until complete and returns a final value -# which is delivered to the caller side and retreived via +# call ``Context.started()`` to unblock this entry on the parent side. +# the child task executes until complete and returns a final value +# which is delivered to the parent side and retreived via # ``Context.result()``. # - cancel termination: @@ -170,9 +170,9 @@ async def assert_state(value: bool): [False, ValueError, KeyboardInterrupt], ) @pytest.mark.parametrize( - 'callee_blocks_forever', + 'child_blocks_forever', [False, True], - ids=lambda item: f'callee_blocks_forever={item}' + ids=lambda item: f'child_blocks_forever={item}' ) @pytest.mark.parametrize( 'pointlessly_open_stream', @@ -181,7 +181,7 @@ async def assert_state(value: bool): ) def test_simple_context( error_parent, - callee_blocks_forever, + child_blocks_forever, pointlessly_open_stream, debug_mode: bool, ): @@ -204,13 +204,13 @@ def test_simple_context( portal.open_context( simple_setup_teardown, data=10, - block_forever=callee_blocks_forever, + block_forever=child_blocks_forever, ) as (ctx, sent), ): assert current_ipc_ctx() is ctx assert sent == 11 - if callee_blocks_forever: + if child_blocks_forever: await portal.run(assert_state, value=True) else: assert await ctx.result() == 'yo' @@ -220,7 +220,7 @@ def test_simple_context( if error_parent: raise error_parent - if callee_blocks_forever: + if child_blocks_forever: await ctx.cancel() else: # in this case the stream will send a @@ -259,9 +259,9 @@ def test_simple_context( @pytest.mark.parametrize( - 'callee_returns_early', + 'child_returns_early', [True, False], - ids=lambda item: f'callee_returns_early={item}' + ids=lambda item: f'child_returns_early={item}' ) @pytest.mark.parametrize( 'cancel_method', @@ -273,14 +273,14 @@ def test_simple_context( [True, False], ids=lambda item: f'chk_ctx_result_before_exit={item}' ) -def test_caller_cancels( +def test_parent_cancels( cancel_method: str, chk_ctx_result_before_exit: bool, - callee_returns_early: bool, + child_returns_early: bool, debug_mode: bool, ): ''' - Verify that when the opening side of a context (aka the caller) + Verify that when the opening side of a context (aka the parent) cancels that context, the ctx does not raise a cancelled when either calling `.result()` or on context exit. @@ -294,7 +294,7 @@ def test_caller_cancels( if ( cancel_method == 'portal' - and not callee_returns_early + and not child_returns_early ): try: res = await ctx.result() @@ -318,7 +318,7 @@ def test_caller_cancels( pytest.fail(f'should not have raised ctxc\n{ctxc}') # we actually get a result - if callee_returns_early: + if child_returns_early: assert res == 'yo' assert ctx.outcome is res assert ctx.maybe_error is None @@ -362,14 +362,14 @@ def test_caller_cancels( ) timeout: float = ( 0.5 - if not callee_returns_early + if not child_returns_early else 2 ) with trio.fail_after(timeout): async with ( expect_ctxc( yay=( - not callee_returns_early + not child_returns_early and cancel_method == 'portal' ) ), @@ -377,13 +377,13 @@ def test_caller_cancels( portal.open_context( simple_setup_teardown, data=10, - block_forever=not callee_returns_early, + block_forever=not child_returns_early, ) as (ctx, sent), ): - if callee_returns_early: + if child_returns_early: # ensure we block long enough before sending - # a cancel such that the callee has already + # a cancel such that the child has already # returned it's result. await trio.sleep(0.5) @@ -421,7 +421,7 @@ def test_caller_cancels( # which should in turn cause `ctx._scope` to # catch any cancellation? if ( - not callee_returns_early + not child_returns_early and cancel_method != 'portal' ): assert not ctx._scope.cancelled_caught @@ -430,11 +430,11 @@ def test_caller_cancels( # basic stream terminations: -# - callee context closes without using stream -# - caller context closes without using stream -# - caller context calls `Context.cancel()` while streaming -# is ongoing resulting in callee being cancelled -# - callee calls `Context.cancel()` while streaming and caller +# - child context closes without using stream +# - parent context closes without using stream +# - parent context calls `Context.cancel()` while streaming +# is ongoing resulting in child being cancelled +# - child calls `Context.cancel()` while streaming and parent # sees stream terminated in `RemoteActorError` # TODO: future possible features @@ -470,7 +470,7 @@ async def test_child_exits_ctx_after_stream_open( parent_send_before_receive: bool, ): ''' - callee context closes without using stream. + child context closes without using stream. This should result in a msg sequence |__ @@ -485,13 +485,7 @@ async def test_child_exits_ctx_after_stream_open( ''' timeout: float = ( - 0.5 if ( - not debug_mode - # NOTE, for debugging final - # Return-consumed-n-discarded-ishue! - # and - # not parent_send_before_receive - ) else 999 + 0.5 if not debug_mode else 999 ) async with tractor.open_nursery( debug_mode=debug_mode, @@ -602,7 +596,7 @@ async def expect_cancelled( raise else: - assert 0, "callee wasn't cancelled !?" + assert 0, "child wasn't cancelled !?" @pytest.mark.parametrize( @@ -857,7 +851,7 @@ async def test_child_cancels_before_started( debug_mode: bool, ): ''' - Callee calls `Context.cancel()` while streaming and caller + Callee calls `Context.cancel()` while streaming and parent sees stream terminated in `ContextCancelled`. ''' @@ -910,7 +904,7 @@ async def keep_sending_from_child( ) -> None: ''' - Send endlessly on the calleee stream. + Send endlessly on the child stream. ''' await ctx.started() @@ -918,7 +912,7 @@ async def keep_sending_from_child( msg_buffer_size=msg_buffer_size, ) as stream: for msg in count(): - print(f'callee sending {msg}') + print(f'child sending {msg}') await stream.send(msg) await trio.sleep(0.01) @@ -926,12 +920,12 @@ async def keep_sending_from_child( @pytest.mark.parametrize( 'overrun_by', [ - ('caller', 1, never_open_stream), - ('callee', 0, keep_sending_from_child), + ('parent', 1, never_open_stream), + ('child', 0, keep_sending_from_child), ], ids=[ - ('caller_1buf_never_open_stream'), - ('callee_0buf_keep_sending_from_callee'), + ('parent_1buf_never_open_stream'), + ('child_0buf_keep_sending_from_child'), ] ) def test_one_end_stream_not_opened( @@ -962,8 +956,7 @@ def test_one_end_stream_not_opened( ) as (ctx, sent): assert sent is None - if 'caller' in overrunner: - + if 'parent' in overrunner: async with ctx.open_stream() as stream: # itersend +1 msg more then the buffer size @@ -978,7 +971,7 @@ def test_one_end_stream_not_opened( await trio.sleep_forever() else: - # callee overruns caller case so we do nothing here + # child overruns parent case so we do nothing here await trio.sleep_forever() await portal.cancel_actor() @@ -986,19 +979,19 @@ def test_one_end_stream_not_opened( # 2 overrun cases and the no overrun case (which pushes right up to # the msg limit) if ( - overrunner == 'caller' + overrunner == 'parent' ): with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) assert excinfo.value.boxed_type == StreamOverrun - elif overrunner == 'callee': + elif overrunner == 'child': with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) # TODO: embedded remote errors so that we can verify the source - # error? the callee delivers an error which is an overrun + # error? the child delivers an error which is an overrun # wrapped in a remote actor error. assert excinfo.value.boxed_type == tractor.RemoteActorError @@ -1017,12 +1010,12 @@ async def echo_back_sequence( ) -> None: ''' - Send endlessly on the calleee stream using a small buffer size + Send endlessly on the child stream using a small buffer size setting on the contex to simulate backlogging that would normally cause overruns. ''' - # NOTE: ensure that if the caller is expecting to cancel this task + # NOTE: ensure that if the parent is expecting to cancel this task # that we stay echoing much longer then they are so we don't # return early instead of receive the cancel msg. total_batches: int = ( @@ -1072,18 +1065,18 @@ async def echo_back_sequence( if be_slow: await trio.sleep(0.05) - print('callee waiting on next') + print('child waiting on next') - print(f'callee echoing back latest batch\n{batch}') + print(f'child echoing back latest batch\n{batch}') for msg in batch: - print(f'callee sending msg\n{msg}') + print(f'child sending msg\n{msg}') await stream.send(msg) try: return 'yo' finally: print( - 'exiting callee with context:\n' + 'exiting child with context:\n' f'{pformat(ctx)}\n' ) @@ -1137,7 +1130,7 @@ def test_maybe_allow_overruns_stream( debug_mode=debug_mode, ) as an: portal = await an.start_actor( - 'callee_sends_forever', + 'child_sends_forever', enable_modules=[__name__], loglevel=loglevel, debug_mode=debug_mode, -- 2.34.1 From de8d8afc41f46e7e0579e2ebd05dde0fc1c8c077 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Mar 2025 13:47:53 -0400 Subject: [PATCH 14/19] Rename ext-types with `msgspec` suite module --- tests/{test_caps_based_msging.py => test_ext_types_msgspec.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{test_caps_based_msging.py => test_ext_types_msgspec.py} (100%) diff --git a/tests/test_caps_based_msging.py b/tests/test_ext_types_msgspec.py similarity index 100% rename from tests/test_caps_based_msging.py rename to tests/test_ext_types_msgspec.py -- 2.34.1 From 7b43bd9f25e7792ee4ec2f14b0be50cf51def3c1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Mar 2025 13:49:58 -0400 Subject: [PATCH 15/19] Slight `PldRx` rework to simplify Namely renaming and tweaking the `MsgType` receiving methods, - `.recv_msg()` from what was `.recv_msg_w_pld()` which both receives the IPC msg from the underlying `._rx_chan` and then decodes its payload with `.decode_pld()`; it now also log reports on the different "stage of SC dialog protocol" msg types via a `match/case`. - a new `.recv_msg_nowait()` sync equivalent of ^ (*was* `.recv_pld_nowait()`) who's use was the source of a recently discovered bug where any final `Return.pld` is being consumed-n-discarded by by `MsgStream.aclose()` depending on ctx/stream teardown race conditions.. Also, - remove all the "instance persistent" ipc-ctx attrs, specifically the optional `_ipc`, `_ctx` and the `.wraps_ipc()` cm, since none of them were ever really needed/used; all methods which require a `Context/MsgStream` are explicitly always passed. - update a buncha typing namely to use the more generic-styled `PayloadT` over `Any` and obviously `MsgType[PayloadT]`. --- tractor/msg/_ops.py | 151 ++++++++++++++++++++------------------------ 1 file changed, 68 insertions(+), 83 deletions(-) diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 5f4b9fe8..fbbbecff 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -110,33 +110,11 @@ class PldRx(Struct): # TODO: better to bind it here? # _rx_mc: trio.MemoryReceiveChannel _pld_dec: MsgDec - _ctx: Context|None = None - _ipc: Context|MsgStream|None = None @property def pld_dec(self) -> MsgDec: return self._pld_dec - # TODO: a better name? - # -[ ] when would this be used as it avoids needingn to pass the - # ipc prim to every method - @cm - def wraps_ipc( - self, - ipc_prim: Context|MsgStream, - - ) -> PldRx: - ''' - Apply this payload receiver to an IPC primitive type, one - of `Context` or `MsgStream`. - - ''' - self._ipc = ipc_prim - try: - yield self - finally: - self._ipc = None - @cm def limit_plds( self, @@ -169,7 +147,7 @@ class PldRx(Struct): def dec(self) -> msgpack.Decoder: return self._pld_dec.dec - def recv_pld_nowait( + def recv_msg_nowait( self, # TODO: make this `MsgStream` compat as well, see above^ # ipc_prim: Context|MsgStream, @@ -180,7 +158,15 @@ class PldRx(Struct): hide_tb: bool = False, **dec_pld_kwargs, - ) -> Any|Raw: + ) -> tuple[ + MsgType[PayloadT], + PayloadT, + ]: + ''' + Attempt to non-blocking receive a message from the `._rx_chan` and + unwrap it's payload delivering the pair to the caller. + + ''' __tracebackhide__: bool = hide_tb msg: MsgType = ( @@ -189,31 +175,78 @@ class PldRx(Struct): # sync-rx msg from underlying IPC feeder (mem-)chan ipc._rx_chan.receive_nowait() ) - if ( - type(msg) is Return - ): - log.info( - f'Rxed final result msg\n' - f'{msg}\n' - ) - return self.decode_pld( + pld: PayloadT = self.decode_pld( msg, ipc=ipc, expect_msg=expect_msg, hide_tb=hide_tb, **dec_pld_kwargs, ) + return ( + msg, + pld, + ) + + async def recv_msg( + self, + ipc: Context|MsgStream, + expect_msg: MsgType, + + # NOTE: ONLY for handling `Stop`-msgs that arrive during + # a call to `drain_to_final_msg()` above! + passthrough_non_pld_msgs: bool = True, + hide_tb: bool = True, + + **decode_pld_kwargs, + + ) -> tuple[MsgType, PayloadT]: + ''' + Retrieve the next avail IPC msg, decode its payload, and + return the (msg, pld) pair. + + ''' + __tracebackhide__: bool = hide_tb + msg: MsgType = await ipc._rx_chan.receive() + match msg: + case Return()|Error(): + log.runtime( + f'Rxed final outcome msg\n' + f'{msg}\n' + ) + case Stop(): + log.runtime( + f'Rxed stream stopped msg\n' + f'{msg}\n' + ) + if passthrough_non_pld_msgs: + return msg, None + + # TODO: is there some way we can inject the decoded + # payload into an existing output buffer for the original + # msg instance? + pld: PayloadT = self.decode_pld( + msg, + ipc=ipc, + expect_msg=expect_msg, + hide_tb=hide_tb, + + **decode_pld_kwargs, + ) + return ( + msg, + pld, + ) async def recv_pld( self, ipc: Context|MsgStream, - ipc_msg: MsgType|None = None, + ipc_msg: MsgType[PayloadT]|None = None, expect_msg: Type[MsgType]|None = None, hide_tb: bool = True, **dec_pld_kwargs, - ) -> Any|Raw: + ) -> PayloadT: ''' Receive a `MsgType`, then decode and return its `.pld` field. @@ -420,54 +453,6 @@ class PldRx(Struct): __tracebackhide__: bool = False raise - async def recv_msg_w_pld( - self, - ipc: Context|MsgStream, - expect_msg: MsgType, - - # NOTE: generally speaking only for handling `Stop`-msgs that - # arrive during a call to `drain_to_final_msg()` above! - passthrough_non_pld_msgs: bool = True, - hide_tb: bool = True, - **kwargs, - - ) -> tuple[MsgType, PayloadT]: - ''' - Retrieve the next avail IPC msg, decode it's payload, and - return the pair of refs. - - ''' - __tracebackhide__: bool = hide_tb - msg: MsgType = await ipc._rx_chan.receive() - if ( - type(msg) is Return - ): - log.info( - f'Rxed final result msg\n' - f'{msg}\n' - ) - - if passthrough_non_pld_msgs: - match msg: - case Stop(): - return msg, None - - # TODO: is there some way we can inject the decoded - # payload into an existing output buffer for the original - # msg instance? - pld: PayloadT = self.decode_pld( - msg, - ipc=ipc, - expect_msg=expect_msg, - hide_tb=hide_tb, - **kwargs, - ) - # log.runtime( - # f'Delivering payload msg\n' - # f'{msg}\n' - # ) - return msg, pld - @cm def limit_plds( @@ -607,7 +592,7 @@ async def drain_to_final_msg( # receive all msgs, scanning for either a final result # or error; the underlying call should never raise any # remote error directly! - msg, pld = await ctx._pld_rx.recv_msg_w_pld( + msg, pld = await ctx._pld_rx.recv_msg( ipc=ctx, expect_msg=Return, raise_error=False, -- 2.34.1 From c8dd4a3452b441eb44bdbdff70699b64f887982e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Mar 2025 15:03:55 -0400 Subject: [PATCH 16/19] Add `Context._outcome_msg` use new `PldRx` API Such that any `Return` is always capture for each ctx instance and set in `._deliver_msg()` normally; ensures we can at least introspect for it when missing (like in a recently discovered stream teardown race bug). Yes this augments the already existing `._result` which is dedicated for the `._outcome_msg.pld` in the non-error case; we might want to see if there's a nicer way to directly proxy ref to that without getting the pre-pld-decoded `Raw` form with `msgspec`? Also use the new `ctx._pld_rx.recv_msg()` and drop assigning `pld_rx._ctx`. --- tractor/_context.py | 48 ++++++++++++++++++++++++++++++++++++--------- tractor/_portal.py | 2 +- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index 5d6ccf69..201e920a 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -82,6 +82,7 @@ from .msg import ( MsgType, NamespacePath, PayloadT, + Return, Started, Stop, Yield, @@ -245,11 +246,13 @@ class Context: # a drain loop? # _res_scope: trio.CancelScope|None = None + _outcome_msg: Return|Error|ContextCancelled = Unresolved + # on a clean exit there should be a final value # delivered from the far end "callee" task, so # this value is only set on one side. # _result: Any | int = None - _result: Any|Unresolved = Unresolved + _result: PayloadT|Unresolved = Unresolved # if the local "caller" task errors this value is always set # to the error that was captured in the @@ -1199,9 +1202,11 @@ class Context: ''' __tracebackhide__: bool = hide_tb - assert self._portal, ( - '`Context.wait_for_result()` can not be called from callee side!' - ) + if not self._portal: + raise RuntimeError( + 'Invalid usage of `Context.wait_for_result()`!\n' + 'Not valid on child-side IPC ctx!\n' + ) if self._final_result_is_set(): return self._result @@ -1222,6 +1227,8 @@ class Context: # since every message should be delivered via the normal # `._deliver_msg()` route which will appropriately set # any `.maybe_error`. + outcome_msg: Return|Error|ContextCancelled + drained_msgs: list[MsgType] ( outcome_msg, drained_msgs, @@ -1229,11 +1236,19 @@ class Context: ctx=self, hide_tb=hide_tb, ) - drained_status: str = ( 'Ctx drained to final outcome msg\n\n' f'{outcome_msg}\n' ) + + # ?XXX, should already be set in `._deliver_msg()` right? + if self._outcome_msg is not Unresolved: + # from .devx import _debug + # await _debug.pause() + assert self._outcome_msg is outcome_msg + else: + self._outcome_msg = outcome_msg + if drained_msgs: drained_status += ( '\n' @@ -1741,7 +1756,6 @@ class Context: f'{structfmt(msg)}\n' ) - # NOTE: if an error is deteced we should always still # send it through the feeder-mem-chan and expect # it to be raised by any context (stream) consumer @@ -1753,6 +1767,21 @@ class Context: # normally the task that should get cancelled/error # from some remote fault! send_chan.send_nowait(msg) + match msg: + case Stop(): + if (stream := self._stream): + stream._stop_msg = msg + + case Return(): + if not self._outcome_msg: + log.warning( + f'Setting final outcome msg AFTER ' + f'`._rx_chan.send()`??\n' + f'\n' + f'{msg}' + ) + self._outcome_msg = msg + return True except trio.BrokenResourceError: @@ -2009,7 +2038,7 @@ async def open_context_from_portal( # the dialog, the `Error` msg should be raised from the `msg` # handling block below. try: - started_msg, first = await ctx._pld_rx.recv_msg_w_pld( + started_msg, first = await ctx._pld_rx.recv_msg( ipc=ctx, expect_msg=Started, passthrough_non_pld_msgs=False, @@ -2374,7 +2403,8 @@ async def open_context_from_portal( # displaying `ContextCancelled` traces where the # cause of crash/exit IS due to something in # user/app code on either end of the context. - and not rxchan._closed + and + not rxchan._closed ): # XXX NOTE XXX: and again as per above, we mask any # `trio.Cancelled` raised here so as to NOT mask @@ -2433,6 +2463,7 @@ async def open_context_from_portal( # FINALLY, remove the context from runtime tracking and # exit! log.runtime( + # log.cancel( f'De-allocating IPC ctx opened with {ctx.side!r} peer \n' f'uid: {uid}\n' f'cid: {ctx.cid}\n' @@ -2488,7 +2519,6 @@ def mk_context( _caller_info=caller_info, **kwargs, ) - pld_rx._ctx = ctx ctx._result = Unresolved return ctx diff --git a/tractor/_portal.py b/tractor/_portal.py index 7fbf69b2..cee10c47 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -184,7 +184,7 @@ class Portal: ( self._final_result_msg, self._final_result_pld, - ) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld( + ) = await self._expect_result_ctx._pld_rx.recv_msg( ipc=self._expect_result_ctx, expect_msg=Return, ) -- 2.34.1 From 8921443503f25a1931ef05e1e71ade8b591e5adb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Mar 2025 16:24:39 -0400 Subject: [PATCH 17/19] Add `MsgStream._stop_msg` use new `PldRx` API In particular ensuring we use `ctx._pld_rx.recv_msg_nowait()` from `.receive_nowait()` (which is called from `.aclose()`) such that we ALWAYS (can) set the surrounding `Context._result/._outcome_msg` attrs on reception of a final `Return`!! This fixes a final stream-teardown-race-condition-bug where prior we normally didn't set the `Context._result/._outcome_msg` in such cases. This is **precisely because** `.receive_nowait()` only returns the `pld` and when called from `.aclose()` this value is discarded, meaning so is its boxing `Return` despite consuming it from the underlying `._rx_chan`.. Longer term this should be solved differently by ensuring such races cases are handled at a higher scope like inside `Context._deliver_msg()` or the `Portal.open_context()` enter/exit blocks? Add a detailed warning note and todos for all this around the special case block! --- tractor/_streaming.py | 118 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 95 insertions(+), 23 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 58e9b069..2ff2d41c 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -45,9 +45,11 @@ from .trionics import ( BroadcastReceiver, ) from tractor.msg import ( - # Return, - # Stop, + Error, + Return, + Stop, MsgType, + PayloadT, Yield, ) @@ -70,8 +72,7 @@ class MsgStream(trio.abc.Channel): A bidirectional message stream for receiving logically sequenced values over an inter-actor IPC `Channel`. - This is the type returned to a local task which entered either - `Portal.open_stream_from()` or `Context.open_stream()`. + Termination rules: @@ -94,6 +95,9 @@ class MsgStream(trio.abc.Channel): self._rx_chan = rx_chan self._broadcaster = _broadcaster + # any actual IPC msg which is effectively an `EndOfStream` + self._stop_msg: bool|Stop = False + # flag to denote end of stream self._eoc: bool|trio.EndOfChannel = False self._closed: bool|trio.ClosedResourceError = False @@ -125,16 +129,67 @@ class MsgStream(trio.abc.Channel): def receive_nowait( self, expect_msg: MsgType = Yield, - ): + ) -> PayloadT: ctx: Context = self._ctx - return ctx._pld_rx.recv_pld_nowait( + ( + msg, + pld, + ) = ctx._pld_rx.recv_msg_nowait( ipc=self, expect_msg=expect_msg, ) + # ?TODO, maybe factor this into a hyper-common `unwrap_pld()` + # + match msg: + + # XXX, these never seems to ever hit? cool? + case Stop(): + log.cancel( + f'Msg-stream was ended via stop msg\n' + f'{msg}' + ) + case Error(): + log.error( + f'Msg-stream was ended via error msg\n' + f'{msg}' + ) + + # XXX NOTE, always set any final result on the ctx to + # avoid teardown race conditions where previously this msg + # would be consumed silently (by `.aclose()` doing its + # own "msg drain loop" but WITHOUT those `drained: lists[MsgType]` + # being post-close-processed! + # + # !!TODO, see the equiv todo-comment in `.receive()` + # around the `if drained:` where we should prolly + # ACTUALLY be doing this post-close processing?? + # + case Return(pld=pld): + log.warning( + f'Msg-stream final result msg for IPC ctx?\n' + f'{msg}' + ) + # XXX TODO, this **should be covered** by higher + # scoped runtime-side method calls such as + # `Context._deliver_msg()`, so you should never + # really see the warning above or else something + # racy/out-of-order is likely going on between + # actor-runtime-side push tasks and the user-app-side + # consume tasks! + # -[ ] figure out that set of race cases and fix! + # -[ ] possibly return the `msg` given an input + # arg-flag is set so we can process the `Return` + # from the `.aclose()` caller? + # + # breakpoint() # to debug this RACE CASE! + ctx._result = pld + ctx._outcome_msg = msg + + return pld + async def receive( self, - hide_tb: bool = False, ): ''' @@ -154,7 +209,7 @@ class MsgStream(trio.abc.Channel): # except trio.EndOfChannel: # raise StopAsyncIteration # - # see ``.aclose()`` for notes on the old behaviour prior to + # see `.aclose()` for notes on the old behaviour prior to # introducing this if self._eoc: raise self._eoc @@ -165,7 +220,11 @@ class MsgStream(trio.abc.Channel): src_err: Exception|None = None # orig tb try: ctx: Context = self._ctx - return await ctx._pld_rx.recv_pld(ipc=self) + pld = await ctx._pld_rx.recv_pld( + ipc=self, + expect_msg=Yield, + ) + return pld # XXX: the stream terminates on either of: # - `self._rx_chan.receive()` raising after manual closure @@ -174,7 +233,7 @@ class MsgStream(trio.abc.Channel): # - via a `Stop`-msg received from remote peer task. # NOTE # |_ previously this was triggered by calling - # ``._rx_chan.aclose()`` on the send side of the channel + # `._rx_chan.aclose()` on the send side of the channel # inside `Actor._deliver_ctx_payload()`, but now the 'stop' # message handling gets delegated to `PldRFx.recv_pld()` # internals. @@ -198,11 +257,14 @@ class MsgStream(trio.abc.Channel): # terminated and signal this local iterator to stop drained: list[Exception|dict] = await self.aclose() if drained: - # ?TODO? pass these to the `._ctx._drained_msgs: deque` - # and then iterate them as part of any `.wait_for_result()` call? - # - # from .devx import pause - # await pause() + # ^^^^^^^^TODO? pass these to the `._ctx._drained_msgs: + # deque` and then iterate them as part of any + # `.wait_for_result()` call? + # + # -[ ] move the match-case processing from + # `.receive_nowait()` instead to right here, use it from + # a for msg in drained:` post-proc loop? + # log.warning( 'Drained context msgs during closure\n\n' f'{drained}' @@ -265,9 +327,6 @@ class MsgStream(trio.abc.Channel): - more or less we try to maintain adherance to trio's `.aclose()` semantics: https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose ''' - - # rx_chan = self._rx_chan - # XXX NOTE XXX # it's SUPER IMPORTANT that we ensure we don't DOUBLE # DRAIN msgs on closure so avoid getting stuck handing on @@ -279,15 +338,16 @@ class MsgStream(trio.abc.Channel): # this stream has already been closed so silently succeed as # per ``trio.AsyncResource`` semantics. # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose + # import tractor + # await tractor.pause() return [] ctx: Context = self._ctx drained: list[Exception|dict] = [] while not drained: try: - maybe_final_msg = self.receive_nowait( - # allow_msgs=[Yield, Return], - expect_msg=Yield, + maybe_final_msg: Yield|Return = self.receive_nowait( + expect_msg=Yield|Return, ) if maybe_final_msg: log.debug( @@ -372,8 +432,10 @@ class MsgStream(trio.abc.Channel): # await rx_chan.aclose() if not self._eoc: + this_side: str = self._ctx.side + peer_side: str = self._ctx.peer_side message: str = ( - f'Stream self-closed by {self._ctx.side!r}-side before EoC\n' + f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n' # } bc a stream is a "scope"/msging-phase inside an IPC f'x}}>\n' f' |_{self}\n' @@ -381,9 +443,19 @@ class MsgStream(trio.abc.Channel): log.cancel(message) self._eoc = trio.EndOfChannel(message) + if ( + (rx_chan := self._rx_chan) + and + (stats := rx_chan.statistics()).tasks_waiting_receive + ): + log.cancel( + f'Msg-stream is closing but there is still reader tasks,\n' + f'{stats}\n' + ) + # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? # => NO, DEFINITELY NOT! <= - # if we're a bi-dir ``MsgStream`` BECAUSE this same + # if we're a bi-dir `MsgStream` BECAUSE this same # core-msg-loop mem recv-chan is used to deliver the # potential final result from the surrounding inter-actor # `Context` so we don't want to close it until that -- 2.34.1 From 27a97e8b1c299ac1e1a84525de5468df8f3013be Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Mar 2025 16:41:42 -0400 Subject: [PATCH 18/19] Add `.runtime()`-emit to `._invoke()` to report final result msg in the child --- tractor/_rpc.py | 4 ++++ tractor/_runtime.py | 6 ++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 086cfff6..c5daed9e 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -649,6 +649,10 @@ async def _invoke( ) # set and shuttle final result to "parent"-side task. ctx._result = res + log.runtime( + f'Sending result msg and exiting {ctx.side!r}\n' + f'{return_msg}\n' + ) await chan.send(return_msg) # NOTE: this happens IFF `ctx._scope.cancel()` is diff --git a/tractor/_runtime.py b/tractor/_runtime.py index e7faaedf..890a690a 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -836,8 +836,10 @@ class Actor: )] except KeyError: report: str = ( - 'Ignoring invalid IPC ctx msg!\n\n' - f'<=? {uid}\n\n' + 'Ignoring invalid IPC msg!?\n' + f'Ctx seems to not/no-longer exist??\n' + f'\n' + f'<=? {uid}\n' f' |_{pretty_struct.pformat(msg)}\n' ) match msg: -- 2.34.1 From eb1202937224871797f6f869957dabbc73d5d289 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 16 Mar 2025 17:20:20 -0400 Subject: [PATCH 19/19] Mask top level import of `.hilevel` Since it isn't required until the landing of the new service-manager stuff in https://pikers.dev/goodboy/tractor/pulls/12; was an oversight from commit `0607a31dddeba032a2cf7d9fe605edd9d7bb4846`. --- tractor/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 6ddbf199..0c011a22 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -67,4 +67,4 @@ from ._root import ( from ._ipc import Channel as Channel from ._portal import Portal as Portal from ._runtime import Actor as Actor -from . import hilevel as hilevel +# from . import hilevel as hilevel -- 2.34.1