diff --git a/tests/test_caps_based_msging.py b/tests/test_caps_based_msging.py index cdc6d59..4d78a11 100644 --- a/tests/test_caps_based_msging.py +++ b/tests/test_caps_based_msging.py @@ -5,6 +5,11 @@ Low-level functional audits for our B~) ''' +from contextlib import ( + contextmanager as cm, + # nullcontext, +) +import importlib from typing import ( Any, Type, @@ -12,10 +17,10 @@ from typing import ( ) from msgspec import ( - structs, - msgpack, + # structs, + # msgpack, Raw, - Struct, + # Struct, ValidationError, ) import pytest @@ -24,7 +29,7 @@ import trio import tractor from tractor import ( Actor, - _state, + # _state, MsgTypeError, Context, ) @@ -42,815 +47,16 @@ from tractor.msg import ( current_codec, ) from tractor.msg.types import ( - _payload_msgs, log, - PayloadMsg, Started, - mk_msg_spec, + # _payload_msgs, + # PayloadMsg, + # mk_msg_spec, ) from tractor.msg._ops import ( limit_plds, ) - -def mk_custom_codec( - add_hooks: bool, - -) -> tuple[ - MsgCodec, # encode to send - MsgDec, # pld receive-n-decode -]: - ''' - Create custom `msgpack` enc/dec-hooks and set a `Decoder` - which only loads `pld_spec` (like `NamespacePath`) types. - - ''' - - # XXX NOTE XXX: despite defining `NamespacePath` as a type - # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair - # to cast to/from that type on the wire. See the docs: - # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types - - # if pld_spec is Any: - # pld_spec = Raw - - nsp_codec: MsgCodec = mk_codec( - # ipc_pld_spec=Raw, # default! - - # NOTE XXX: the encode hook MUST be used no matter what since - # our `NamespacePath` is not any of a `Any` native type nor - # a `msgspec.Struct` subtype - so `msgspec` has no way to know - # how to encode it unless we provide the custom hook. - # - # AGAIN that is, regardless of whether we spec an - # `Any`-decoded-pld the enc has no knowledge (by default) - # how to enc `NamespacePath` (nsp), so we add a custom - # hook to do that ALWAYS. - enc_hook=enc_nsp if add_hooks else None, - - # XXX NOTE: pretty sure this is mutex with the `type=` to - # `Decoder`? so it won't work in tandem with the - # `ipc_pld_spec` passed above? - ext_types=[NamespacePath], - ) - # dec_hook=dec_nsp if add_hooks else None, - return nsp_codec - - -def chk_codec_applied( - expect_codec: MsgCodec, - enter_value: MsgCodec|None = None, - -) -> MsgCodec: - ''' - buncha sanity checks ensuring that the IPC channel's - context-vars are set to the expected codec and that are - ctx-var wrapper APIs match the same. - - ''' - # TODO: play with tricyle again, bc this is supposed to work - # the way we want? - # - # TreeVar - # task: trio.Task = trio.lowlevel.current_task() - # curr_codec = _ctxvar_MsgCodec.get_in(task) - - # ContextVar - # task_ctx: Context = task.context - # assert _ctxvar_MsgCodec in task_ctx - # curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec] - - # NOTE: currently we use this! - # RunVar - curr_codec: MsgCodec = current_codec() - last_read_codec = _ctxvar_MsgCodec.get() - # assert curr_codec is last_read_codec - - assert ( - (same_codec := expect_codec) is - # returned from `mk_codec()` - - # yielded value from `apply_codec()` - - # read from current task's `contextvars.Context` - curr_codec is - last_read_codec - - # the default `msgspec` settings - is not _codec._def_msgspec_codec - is not _codec._def_tractor_codec - ) - - if enter_value: - enter_value is same_codec - - -def iter_maybe_sends( - send_items: dict[Union[Type], Any] | list[tuple], - ipc_pld_spec: Union[Type] | Any, - add_codec_hooks: bool, - - codec: MsgCodec|None = None, - -) -> tuple[Any, bool]: - - if isinstance(send_items, dict): - send_items = send_items.items() - - for ( - send_type_spec, - send_value, - ) in send_items: - - expect_roundtrip: bool = False - - # values-to-typespec santiy - send_type = type(send_value) - assert send_type == send_type_spec or ( - (subtypes := getattr(send_type_spec, '__args__', None)) - and send_type in subtypes - ) - - spec_subtypes: set[Union[Type]] = ( - getattr( - ipc_pld_spec, - '__args__', - {ipc_pld_spec,}, - ) - ) - send_in_spec: bool = ( - send_type == ipc_pld_spec - or ( - ipc_pld_spec != Any - and # presume `Union` of types - send_type in spec_subtypes - ) - or ( - ipc_pld_spec == Any - and - send_type != NamespacePath - ) - ) - expect_roundtrip = ( - send_in_spec - # any spec should support all other - # builtin py values that we send - # except our custom nsp type which - # we should be able to send as long - # as we provide the custom codec hooks. - or ( - ipc_pld_spec == Any - and - send_type == NamespacePath - and - add_codec_hooks - ) - ) - - if codec is not None: - # XXX FIRST XXX ensure roundtripping works - # before touching any IPC primitives/APIs. - wire_bytes: bytes = codec.encode( - Started( - cid='blahblah', - pld=send_value, - ) - ) - # NOTE: demonstrates the decoder loading - # to via our native SCIPP msg-spec - # (structurred-conc-inter-proc-protocol) - # implemented as per, - try: - msg: Started = codec.decode(wire_bytes) - if not expect_roundtrip: - pytest.fail( - f'NOT-EXPECTED able to roundtrip value given spec:\n' - f'ipc_pld_spec -> {ipc_pld_spec}\n' - f'value -> {send_value}: {send_type}\n' - ) - - pld = msg.pld - assert pld == send_value - - except ValidationError: - if expect_roundtrip: - pytest.fail( - f'EXPECTED to roundtrip value given spec:\n' - f'ipc_pld_spec -> {ipc_pld_spec}\n' - f'value -> {send_value}: {send_type}\n' - ) - - yield ( - str(send_type), - send_value, - expect_roundtrip, - ) - - -@tractor.context -async def send_back_values( - ctx: Context, - expect_debug: bool, - pld_spec_type_strs: list[str], - add_hooks: bool, - # started_msg_bytes: bytes, - expect_ipc_send: dict[str, tuple[Any, bool]], - -) -> None: - ''' - Setup up a custom codec to load instances of `NamespacePath` - and ensure we can round trip a func ref with our parent. - - ''' - uid: tuple = tractor.current_actor().uid - - # debug mode sanity check (prolly superfluous but, meh) - assert expect_debug == _state.debug_mode() - - # init state in sub-actor should be default - chk_codec_applied( - expect_codec=_codec._def_tractor_codec, - ) - - # load pld spec from input str - ipc_pld_spec = _exts.dec_type_union( - pld_spec_type_strs, - ) - pld_spec_str = str(ipc_pld_spec) - - # same as on parent side config. - nsp_codec: MsgCodec = mk_custom_codec( - add_hooks=add_hooks, - ) - with ( - apply_codec(nsp_codec) as codec, - limit_plds(ipc_pld_spec) as codec, - ): - # we SHOULD NOT be swapping the global codec since it breaks - # `Context.starte()` roundtripping checks! - chk_codec_applied( - expect_codec=nsp_codec, - ) - # XXX SO NOT THIS! - # chk_codec_applied( - # expect_codec=nsp_codec, - # enter_value=codec, - # ) - - print( - f'{uid}: attempting `Started`-bytes DECODE..\n' - ) - try: - # msg: Started = nsp_codec.decode(started_msg_bytes) - - ipc_spec: Type = ctx._pld_rx._pld_dec.spec - expected_pld_spec_str: str = str(ipc_spec) - assert ( - pld_spec_str == expected_pld_spec_str - and - ipc_pld_spec == ipc_spec - ) - - # TODO: maybe we should add our own wrapper error so as to - # be interchange-lib agnostic? - # -[ ] the error type is wtv is raised from the hook so we - # could also require a type-class of errors for - # indicating whether the hook-failure can be handled by - # a nasty-dialog-unprot sub-sys? - except ValidationError: - - # NOTE: only in the `Any` spec case do we expect this to - # work since otherwise no spec covers a plain-ol' - # `.pld: str` - if pld_spec_str == 'Any': - raise - else: - print( - f'{uid}: (correctly) unable to DECODE `Started`-bytes\n' - # f'{started_msg_bytes}\n' - ) - - iter_send_val_items = iter(expect_ipc_send.values()) - sent: list[Any] = [] - for ( - send_value, - expect_send, - ) in iter_send_val_items: - try: - print( - f'{uid}: attempting to `.started({send_value})`\n' - f'=> expect_send: {expect_send}\n' - f'SINCE, ipc_pld_spec: {ipc_pld_spec}\n' - f'AND, codec: {codec}\n' - ) - await ctx.started(send_value) - sent.append(send_value) - if not expect_send: - - # XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL - # `str` handling! or special debug mode IPC - # msgs! - await tractor.pause() - - raise RuntimeError( - f'NOT-EXPECTED able to roundtrip value given spec:\n' - f'ipc_pld_spec -> {ipc_pld_spec}\n' - f'value -> {send_value}: {type(send_value)}\n' - ) - - break # move on to streaming block.. - - except tractor.MsgTypeError as _mte: - mte = _mte - - if expect_send: - raise RuntimeError( - f'EXPECTED to `.started()` value given spec ??\n\n' - f'ipc_pld_spec -> {ipc_pld_spec}\n' - f'value -> {send_value}: {type(send_value)}\n' - ) - - # await tractor.pause() - raise mte - - - async with ctx.open_stream() as ipc: - print( - f'{uid}: Entering streaming block to send remaining values..' - ) - - for send_value, expect_send in iter_send_val_items: - send_type: Type = type(send_value) - print( - '------ - ------\n' - f'{uid}: SENDING NEXT VALUE\n' - f'ipc_pld_spec: {ipc_pld_spec}\n' - f'expect_send: {expect_send}\n' - f'val: {send_value}\n' - '------ - ------\n' - ) - try: - await ipc.send(send_value) - print(f'***\n{uid}-CHILD sent {send_value!r}\n***\n') - sent.append(send_value) - - # NOTE: should only raise above on - # `.started()` or a `Return` - # if not expect_send: - # raise RuntimeError( - # f'NOT-EXPECTED able to roundtrip value given spec:\n' - # f'ipc_pld_spec -> {ipc_pld_spec}\n' - # f'value -> {send_value}: {send_type}\n' - # ) - - except ValidationError: - print(f'{uid} FAILED TO SEND {send_value}!') - - # await tractor.pause() - if expect_send: - raise RuntimeError( - f'EXPECTED to roundtrip value given spec:\n' - f'ipc_pld_spec -> {ipc_pld_spec}\n' - f'value -> {send_value}: {send_type}\n' - ) - # continue - - else: - print( - f'{uid}: finished sending all values\n' - 'Should be exiting stream block!\n' - ) - - print(f'{uid}: exited streaming block!') - - # TODO: this won't be true bc in streaming phase we DO NOT - # msgspec check outbound msgs! - # -[ ] once we implement the receiver side `InvalidMsg` - # then we can expect it here? - # assert ( - # len(sent) - # == - # len([val - # for val, expect in - # expect_ipc_send.values() - # if expect is True]) - # ) - - -@pytest.mark.parametrize( - 'ipc_pld_spec', - [ - Any, - NamespacePath, - NamespacePath|None, # the "maybe" spec Bo - ], - ids=[ - 'any_type', - 'nsp_type', - 'maybe_nsp_type', - ] -) -@pytest.mark.parametrize( - 'add_codec_hooks', - [ - True, - False, - ], - ids=['use_codec_hooks', 'no_codec_hooks'], -) -def test_codec_hooks_mod( - debug_mode: bool, - ipc_pld_spec: Union[Type]|Any, - # send_value: None|str|NamespacePath, - add_codec_hooks: bool, -): - ''' - Audit the `.msg.MsgCodec` override apis details given our impl - uses `contextvars` to accomplish per `trio` task codec - application around an inter-proc-task-comms context. - - ''' - async def main(): - nsp = NamespacePath.from_ref(ex_func) - send_items: dict[Union, Any] = { - Union[None]: None, - Union[NamespacePath]: nsp, - Union[str]: str(nsp), - } - - # init default state for actor - chk_codec_applied( - expect_codec=_codec._def_tractor_codec, - ) - - async with tractor.open_nursery( - debug_mode=debug_mode, - ) as an: - p: tractor.Portal = await an.start_actor( - 'sub', - enable_modules=[__name__], - ) - - # TODO: 2 cases: - # - codec not modified -> decode nsp as `str` - # - codec modified with hooks -> decode nsp as - # `NamespacePath` - nsp_codec: MsgCodec = mk_custom_codec( - add_hooks=add_codec_hooks, - ) - with apply_codec(nsp_codec) as codec: - chk_codec_applied( - expect_codec=nsp_codec, - enter_value=codec, - ) - - expect_ipc_send: dict[str, tuple[Any, bool]] = {} - - report: str = ( - 'Parent report on send values with\n' - f'ipc_pld_spec: {ipc_pld_spec}\n' - ' ------ - ------\n' - ) - for ( - val_type_str, - val, - expect_send, - )in iter_maybe_sends( - send_items, - ipc_pld_spec, - add_codec_hooks=add_codec_hooks, - ): - report += ( - f'send_value: {val}: {type(val)} ' - f'=> expect_send: {expect_send}\n' - ) - expect_ipc_send[val_type_str] = ( - val, - expect_send, - ) - - print( - report + - ' ------ - ------\n' - ) - assert len(expect_ipc_send) == len(send_items) - # now try over real IPC with a the subactor - # expect_ipc_rountrip: bool = True - - if ( - subtypes := getattr( - ipc_pld_spec, '__args__', False - ) - ): - pld_types_str: str = '|'.join(subtypes) - # breakpoint() - else: - # TODO, use `.msg._exts` utils instead of this! - pld_types_str: str = ipc_pld_spec.__name__ - - expected_started = Started( - cid='cid', - # pld=str(pld_types_str), - pld=ipc_pld_spec, - ) - started_msg_bytes: bytes = nsp_codec.encode( - expected_started, - ) - # build list of values we expect to receive from - # the subactor. - expect_to_send: list[Any] = [ - val - for val, expect_send in expect_ipc_send.values() - if expect_send - ] - - pld_spec_type_strs: list[str] = _exts.enc_type_union(ipc_pld_spec) - - # XXX should raise an mte (`MsgTypeError`) - # when `add_codec_hooks == False` bc the input - # `expect_ipc_send` kwarg has a nsp which can't be - # serialized! - # - # TODO:can we ensure this happens from the - # `Return`-side (aka the sub) as well? - if not add_codec_hooks: - try: - async with p.open_context( - send_back_values, - expect_debug=debug_mode, - pld_spec_type_strs=pld_spec_type_strs, - add_hooks=add_codec_hooks, - started_msg_bytes=started_msg_bytes, - - # XXX NOTE bc we send a `NamespacePath` in this kwarg - expect_ipc_send=expect_ipc_send, - - ) as (ctx, first): - pytest.fail('ctx should fail to open without custom enc_hook!?') - - # this test passes bc we can go no further! - except MsgTypeError: - # teardown nursery - await p.cancel_actor() - return - - # TODO: send the original nsp here and - # test with `limit_msg_spec()` above? - # await tractor.pause() - print('PARENT opening IPC ctx!\n') - ctx: tractor.Context - ipc: tractor.MsgStream - async with ( - - # XXX should raise an mte (`MsgTypeError`) - # when `add_codec_hooks == False`.. - p.open_context( - send_back_values, - expect_debug=debug_mode, - pld_spec_type_strs=pld_spec_type_strs, - add_hooks=add_codec_hooks, - started_msg_bytes=nsp_codec.encode(expected_started), - expect_ipc_send=expect_ipc_send, - ) as (ctx, first), - - ctx.open_stream() as ipc, - ): - # ensure codec is still applied across - # `tractor.Context` + its embedded nursery. - chk_codec_applied( - expect_codec=nsp_codec, - enter_value=codec, - ) - print( - 'root: ENTERING CONTEXT BLOCK\n' - f'type(first): {type(first)}\n' - f'first: {first}\n' - ) - expect_to_send.remove(first) - - # TODO: explicit values we expect depending on - # codec config! - # assert first == first_val - # assert first == f'{__name__}:ex_func' - - async for next_sent in ipc: - print( - 'Parent: child sent next value\n' - f'{next_sent}: {type(next_sent)}\n' - ) - if expect_to_send: - expect_to_send.remove(next_sent) - else: - print('PARENT should terminate stream loop + block!') - - # all sent values should have arrived! - assert not expect_to_send - - await p.cancel_actor() - - trio.run(main) - - -def chk_pld_type( - payload_spec: Type[Struct]|Any, - pld: Any, - - expect_roundtrip: bool|None = None, - -) -> bool: - - pld_val_type: Type = type(pld) - - # TODO: verify that the overridden subtypes - # DO NOT have modified type-annots from original! - # 'Start', .pld: FuncSpec - # 'StartAck', .pld: IpcCtxSpec - # 'Stop', .pld: UNSEt - # 'Error', .pld: ErrorData - - codec: MsgCodec = mk_codec( - # NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified - # type union. - ipc_pld_spec=payload_spec, - ) - - # make a one-off dec to compare with our `MsgCodec` instance - # which does the below `mk_msg_spec()` call internally - ipc_msg_spec: Union[Type[Struct]] - msg_types: list[PayloadMsg[payload_spec]] - ( - ipc_msg_spec, - msg_types, - ) = mk_msg_spec( - payload_type_union=payload_spec, - ) - _enc = msgpack.Encoder() - _dec = msgpack.Decoder( - type=ipc_msg_spec or Any, # like `PayloadMsg[Any]` - ) - - assert ( - payload_spec - == - codec.pld_spec - ) - - # assert codec.dec == dec - # - # ^-XXX-^ not sure why these aren't "equal" but when cast - # to `str` they seem to match ?? .. kk - - assert ( - str(ipc_msg_spec) - == - str(codec.msg_spec) - == - str(_dec.type) - == - str(codec.dec.type) - ) - - # verify the boxed-type for all variable payload-type msgs. - if not msg_types: - breakpoint() - - roundtrip: bool|None = None - pld_spec_msg_names: list[str] = [ - td.__name__ for td in _payload_msgs - ] - for typedef in msg_types: - - skip_runtime_msg: bool = typedef.__name__ not in pld_spec_msg_names - if skip_runtime_msg: - continue - - pld_field = structs.fields(typedef)[1] - assert pld_field.type is payload_spec # TODO-^ does this need to work to get all subtypes to adhere? - - kwargs: dict[str, Any] = { - 'cid': '666', - 'pld': pld, - } - enc_msg: PayloadMsg = typedef(**kwargs) - - _wire_bytes: bytes = _enc.encode(enc_msg) - wire_bytes: bytes = codec.enc.encode(enc_msg) - assert _wire_bytes == wire_bytes - - ve: ValidationError|None = None - try: - dec_msg = codec.dec.decode(wire_bytes) - _dec_msg = _dec.decode(wire_bytes) - - # decoded msg and thus payload should be exactly same! - assert (roundtrip := ( - _dec_msg - == - dec_msg - == - enc_msg - )) - - if ( - expect_roundtrip is not None - and expect_roundtrip != roundtrip - ): - breakpoint() - - assert ( - pld - == - dec_msg.pld - == - enc_msg.pld - ) - # assert (roundtrip := (_dec_msg == enc_msg)) - - except ValidationError as _ve: - ve = _ve - roundtrip: bool = False - if pld_val_type is payload_spec: - raise ValueError( - 'Got `ValidationError` despite type-var match!?\n' - f'pld_val_type: {pld_val_type}\n' - f'payload_type: {payload_spec}\n' - ) from ve - - else: - # ow we good cuz the pld spec mismatched. - print( - 'Got expected `ValidationError` since,\n' - f'{pld_val_type} is not {payload_spec}\n' - ) - else: - if ( - payload_spec is not Any - and - pld_val_type is not payload_spec - ): - raise ValueError( - 'DID NOT `ValidationError` despite expected type match!?\n' - f'pld_val_type: {pld_val_type}\n' - f'payload_type: {payload_spec}\n' - ) - - # full code decode should always be attempted! - if roundtrip is None: - breakpoint() - - return roundtrip - - -# ?TODO? remove since covered in the newer `test_pldrx_limiting`? -def test_limit_msgspec( - debug_mode: bool, -): - ''' - Internals unit testing to verify that type-limiting an IPC ctx's - msg spec with `Pldrx.limit_plds()` results in various - encapsulated `msgspec` object settings and state. - - ''' - async def main(): - async with tractor.open_root_actor( - debug_mode=debug_mode, - ): - # ensure we can round-trip a boxing `PayloadMsg` - assert chk_pld_type( - payload_spec=Any, - pld=None, - expect_roundtrip=True, - ) - - # verify that a mis-typed payload value won't decode - assert not chk_pld_type( - payload_spec=int, - pld='doggy', - ) - - # parametrize the boxed `.pld` type as a custom-struct - # and ensure that parametrization propagates - # to all payload-msg-spec-able subtypes! - class CustomPayload(Struct): - name: str - value: Any - - assert not chk_pld_type( - payload_spec=CustomPayload, - pld='doggy', - ) - - assert chk_pld_type( - payload_spec=CustomPayload, - pld=CustomPayload(name='doggy', value='urmom') - ) - - # yah, we can `.pause_from_sync()` now! - # breakpoint() - - trio.run(main) - - def enc_nsp(obj: Any) -> Any: actor: Actor = tractor.current_actor( err_on_no_runtime=False, @@ -1034,3 +240,816 @@ def test_custom_extension_types( except TypeError: if not add_codec_hooks: pass + +@tractor.context +async def sleep_forever_in_sub( + ctx: Context, +) -> None: + await trio.sleep_forever() + + +def mk_custom_codec( + add_hooks: bool, + +) -> tuple[ + MsgCodec, # encode to send + MsgDec, # pld receive-n-decode +]: + ''' + Create custom `msgpack` enc/dec-hooks and set a `Decoder` + which only loads `pld_spec` (like `NamespacePath`) types. + + ''' + + # XXX NOTE XXX: despite defining `NamespacePath` as a type + # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair + # to cast to/from that type on the wire. See the docs: + # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types + + # if pld_spec is Any: + # pld_spec = Raw + + nsp_codec: MsgCodec = mk_codec( + # ipc_pld_spec=Raw, # default! + + # NOTE XXX: the encode hook MUST be used no matter what since + # our `NamespacePath` is not any of a `Any` native type nor + # a `msgspec.Struct` subtype - so `msgspec` has no way to know + # how to encode it unless we provide the custom hook. + # + # AGAIN that is, regardless of whether we spec an + # `Any`-decoded-pld the enc has no knowledge (by default) + # how to enc `NamespacePath` (nsp), so we add a custom + # hook to do that ALWAYS. + enc_hook=enc_nsp if add_hooks else None, + + # XXX NOTE: pretty sure this is mutex with the `type=` to + # `Decoder`? so it won't work in tandem with the + # `ipc_pld_spec` passed above? + ext_types=[NamespacePath], + ) + # dec_hook=dec_nsp if add_hooks else None, + return nsp_codec + + +@pytest.mark.parametrize( + 'limit_plds_args', + [ + ( + {'dec_hook': None, 'ext_types': None}, + None, + ), + ( + {'dec_hook': dec_nsp, 'ext_types': None}, + TypeError, + ), + ( + {'dec_hook': dec_nsp, 'ext_types': [NamespacePath]}, + None, + ), + ( + {'dec_hook': dec_nsp, 'ext_types': [NamespacePath|None]}, + None, + ), + ], + ids=[ + 'no_hook_no_ext_types', + 'only_hook', + 'hook_and_ext_types', + 'hook_and_ext_types_w_null', + ] +) +def test_pld_limiting_usage( + limit_plds_args: tuple[dict, Exception|None], +): + ''' + Verify `dec_hook()` and `ext_types` need to either both be provided + or we raise a explanator type-error. + + ''' + kwargs, maybe_err = limit_plds_args + async def main(): + async with tractor.open_nursery() as an: # just to open runtime + + # XXX SHOULD NEVER WORK outside an ipc ctx scope! + try: + with limit_plds(**kwargs): + pass + except RuntimeError: + pass + + p: tractor.Portal = await an.start_actor( + 'sub', + enable_modules=[__name__], + ) + async with ( + p.open_context( + sleep_forever_in_sub + ) as (ctx, first), + ): + try: + with limit_plds(**kwargs): + pass + except maybe_err as exc: + assert type(exc) is maybe_err + pass + + +def chk_codec_applied( + expect_codec: MsgCodec|None, + enter_value: MsgCodec|None = None, + +) -> MsgCodec: + ''' + buncha sanity checks ensuring that the IPC channel's + context-vars are set to the expected codec and that are + ctx-var wrapper APIs match the same. + + ''' + # TODO: play with tricyle again, bc this is supposed to work + # the way we want? + # + # TreeVar + # task: trio.Task = trio.lowlevel.current_task() + # curr_codec = _ctxvar_MsgCodec.get_in(task) + + # ContextVar + # task_ctx: Context = task.context + # assert _ctxvar_MsgCodec in task_ctx + # curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec] + if expect_codec is None: + assert enter_value is None + return + + # NOTE: currently we use this! + # RunVar + curr_codec: MsgCodec = current_codec() + last_read_codec = _ctxvar_MsgCodec.get() + # assert curr_codec is last_read_codec + + assert ( + (same_codec := expect_codec) is + # returned from `mk_codec()` + + # yielded value from `apply_codec()` + + # read from current task's `contextvars.Context` + curr_codec is + last_read_codec + + # the default `msgspec` settings + is not _codec._def_msgspec_codec + is not _codec._def_tractor_codec + ) + + if enter_value: + assert enter_value is same_codec + + +# def iter_maybe_sends( +# send_items: dict[Union[Type], Any] | list[tuple], +# ipc_pld_spec: Union[Type] | Any, +# add_codec_hooks: bool, + +# codec: MsgCodec|None = None, + +# ) -> tuple[Any, bool]: + +# if isinstance(send_items, dict): +# send_items = send_items.items() + +# for ( +# send_type_spec, +# send_value, +# ) in send_items: + +# expect_roundtrip: bool = False + +# # values-to-typespec santiy +# send_type = type(send_value) +# assert send_type == send_type_spec or ( +# (subtypes := getattr(send_type_spec, '__args__', None)) +# and send_type in subtypes +# ) + +# spec_subtypes: set[Union[Type]] = ( +# getattr( +# ipc_pld_spec, +# '__args__', +# {ipc_pld_spec,}, +# ) +# ) +# send_in_spec: bool = ( +# send_type == ipc_pld_spec +# or ( +# ipc_pld_spec != Any +# and # presume `Union` of types +# send_type in spec_subtypes +# ) +# or ( +# ipc_pld_spec == Any +# and +# send_type != NamespacePath +# ) +# ) +# expect_roundtrip = ( +# send_in_spec +# # any spec should support all other +# # builtin py values that we send +# # except our custom nsp type which +# # we should be able to send as long +# # as we provide the custom codec hooks. +# or ( +# ipc_pld_spec == Any +# and +# send_type == NamespacePath +# and +# add_codec_hooks +# ) +# ) + +# if codec is not None: +# # XXX FIRST XXX ensure roundtripping works +# # before touching any IPC primitives/APIs. +# wire_bytes: bytes = codec.encode( +# Started( +# cid='blahblah', +# pld=send_value, +# ) +# ) +# # NOTE: demonstrates the decoder loading +# # to via our native SCIPP msg-spec +# # (structurred-conc-inter-proc-protocol) +# # implemented as per, +# try: +# msg: Started = codec.decode(wire_bytes) +# if not expect_roundtrip: +# pytest.fail( +# f'NOT-EXPECTED able to roundtrip value given spec:\n' +# f'ipc_pld_spec -> {ipc_pld_spec}\n' +# f'value -> {send_value}: {send_type}\n' +# ) + +# pld = msg.pld +# assert pld == send_value + +# except ValidationError: +# if expect_roundtrip: +# pytest.fail( +# f'EXPECTED to roundtrip value given spec:\n' +# f'ipc_pld_spec -> {ipc_pld_spec}\n' +# f'value -> {send_value}: {send_type}\n' +# ) + +# yield ( +# str(send_type), +# send_value, +# expect_roundtrip, +# ) + + +@tractor.context +async def send_back_values( + ctx: Context, + rent_pld_spec_type_strs: list[str], + add_hooks: bool, + # expect_ipc_send: dict[str, tuple[Any, bool]], + + # expect_debug: bool, + # started_msg_bytes: bytes, + +) -> None: + ''' + Setup up a custom codec to load instances of `NamespacePath` + and ensure we can round trip a func ref with our parent. + + ''' + uid: tuple = tractor.current_actor().uid + + # init state in sub-actor should be default + chk_codec_applied( + expect_codec=_codec._def_tractor_codec, + ) + + # load pld spec from input str + rent_pld_spec = _exts.dec_type_union( + rent_pld_spec_type_strs, + mods=[ + importlib.import_module(__name__), + ], + ) + rent_pld_spec_types: set[Type] = _codec.unpack_spec_types( + rent_pld_spec, + ) + + # ONLY add ext-hooks if the rent specified a non-std type! + add_hooks: bool = ( + NamespacePath in rent_pld_spec_types + ) + + # same as on parent side config. + nsp_codec: MsgCodec|None = None + if add_hooks: + nsp_codec = mk_codec( + enc_hook=enc_nsp, + ext_types=[NamespacePath], + ) + + with ( + maybe_apply_codec(nsp_codec) as codec, + limit_plds( + rent_pld_spec, + dec_hook=dec_nsp if add_hooks else None, + ext_types=[NamespacePath] if add_hooks else None, + ) as pld_dec, + ): + # ?XXX? SHOULD WE NOT be swapping the global codec since it + # breaks `Context.started()` roundtripping checks?? + chk_codec_applied( + expect_codec=nsp_codec, + enter_value=codec, + ) + + # ?TODO, mismatch case(s)? + # + # ensure pld spec matches on both sides + ctx_pld_dec: MsgDec = ctx._pld_rx._pld_dec + assert pld_dec is ctx_pld_dec + child_pld_spec: Type = pld_dec.spec + child_pld_spec_types: set[Type] = _codec.unpack_spec_types( + child_pld_spec, + ) + assert ( + # child_pld_spec == rent_pld_spec + child_pld_spec_types.issuperset( + rent_pld_spec_types + ) + ) + + # expected_pld_spec_str: str = str(ipc_spec) + # assert ( + # pld_spec_str == expected_pld_spec_str + # and + # ipc_pld_spec == ipc_spec + # ) + + # ?TODO, try loop for each of the types in pld-superset? + # + # for send_value in [ + # nsp, + # str(nsp), + # None, + # ]: + nsp = NamespacePath.from_ref(ex_func) + try: + print( + f'{uid}: attempting to `.started({nsp})`\n' + f'\n' + f'rent_pld_spec: {rent_pld_spec}\n' + f'child_pld_spec: {child_pld_spec}\n' + f'codec: {codec}\n' + ) + await ctx.started(nsp) + + except tractor.MsgTypeError as _mte: + mte = _mte + + # false -ve case + if add_hooks: + raise RuntimeError( + f'EXPECTED to `.started()` value given spec ??\n\n' + f'child_pld_spec -> {child_pld_spec}\n' + f'value = {nsp}: {type(nsp)}\n' + ) + + # true -ve case + raise mte + + # TODO: maybe we should add our own wrapper error so as to + # be interchange-lib agnostic? + # -[ ] the error type is wtv is raised from the hook so we + # could also require a type-class of errors for + # indicating whether the hook-failure can be handled by + # a nasty-dialog-unprot sub-sys? + except TypeError as typerr: + # false -ve + if add_hooks: + raise RuntimeError('Should have been able to send `nsp`??') + + # true -ve + print('Failed to send `nsp` due to no ext hooks set!') + raise typerr + + # now try sending a set of valid and invalid plds to ensure + # the pld spec is respected. + sent: list[Any] = [] + async with ctx.open_stream() as ipc: + print( + f'{uid}: streaming all pld types to rent..' + ) + + # for send_value, expect_send in iter_send_val_items: + for send_value in [ + nsp, + str(nsp), + None, + ]: + send_type: Type = type(send_value) + print( + f'{uid}: SENDING NEXT pld\n' + f'send_type: {send_type}\n' + f'send_value: {send_value}\n' + ) + try: + await ipc.send(send_value) + sent.append(send_value) + + except ValidationError as valerr: + print(f'{uid} FAILED TO SEND {send_value}!') + + # false -ve + if add_hooks: + raise RuntimeError( + f'EXPECTED to roundtrip value given spec:\n' + f'rent_pld_spec -> {rent_pld_spec}\n' + f'child_pld_spec -> {child_pld_spec}\n' + f'value = {send_value}: {send_type}\n' + ) + + # true -ve + raise valerr + # continue + + else: + print( + f'{uid}: finished sending all values\n' + 'Should be exiting stream block!\n' + ) + + print(f'{uid}: exited streaming block!') + + + +@cm +def maybe_apply_codec(codec: MsgCodec|None) -> MsgCodec|None: + if codec is None: + yield None + return + + with apply_codec(codec) as codec: + yield codec + + +@pytest.mark.parametrize( + 'pld_spec', + [ + Any, + NamespacePath, + NamespacePath|None, # the "maybe" spec Bo + ], + ids=[ + 'any_type', + 'only_nsp_ext', + 'maybe_nsp_ext', + ] +) +@pytest.mark.parametrize( + 'add_hooks', + [ + True, + False, + ], + ids=[ + 'use_codec_hooks', + 'no_codec_hooks', + ], +) +def test_ext_types_over_ipc( + debug_mode: bool, + pld_spec: Union[Type], + add_hooks: bool, +): + ''' + Ensure we can support extension types coverted using + `enc/dec_hook()`s passed to the `.msg.limit_plds()` API + and that sane errors happen when we try do the same without + the codec hooks. + + ''' + pld_types: set[Type] = _codec.unpack_spec_types(pld_spec) + + async def main(): + + # sanity check the default pld-spec beforehand + chk_codec_applied( + expect_codec=_codec._def_tractor_codec, + ) + + # extension type we want to send as msg payload + nsp = NamespacePath.from_ref(ex_func) + + # ^NOTE, 2 cases: + # - codec hooks noto added -> decode nsp as `str` + # - codec with hooks -> decode nsp as `NamespacePath` + nsp_codec: MsgCodec|None = None + if ( + NamespacePath in pld_types + and + add_hooks + ): + nsp_codec = mk_codec( + enc_hook=enc_nsp, + ext_types=[NamespacePath], + ) + + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as an: + p: tractor.Portal = await an.start_actor( + 'sub', + enable_modules=[__name__], + ) + with ( + maybe_apply_codec(nsp_codec) as codec, + ): + chk_codec_applied( + expect_codec=nsp_codec, + enter_value=codec, + ) + rent_pld_spec_type_strs: list[str] = _exts.enc_type_union(pld_spec) + + # XXX should raise an mte (`MsgTypeError`) + # when `add_hooks == False` bc the input + # `expect_ipc_send` kwarg has a nsp which can't be + # serialized! + # + # TODO:can we ensure this happens from the + # `Return`-side (aka the sub) as well? + try: + ctx: tractor.Context + ipc: tractor.MsgStream + async with ( + + # XXX should raise an mte (`MsgTypeError`) + # when `add_hooks == False`.. + p.open_context( + send_back_values, + # expect_debug=debug_mode, + rent_pld_spec_type_strs=rent_pld_spec_type_strs, + add_hooks=add_hooks, + # expect_ipc_send=expect_ipc_send, + ) as (ctx, first), + + ctx.open_stream() as ipc, + ): + with ( + limit_plds( + pld_spec, + dec_hook=dec_nsp if add_hooks else None, + ext_types=[NamespacePath] if add_hooks else None, + ) as pld_dec, + ): + ctx_pld_dec: MsgDec = ctx._pld_rx._pld_dec + assert pld_dec is ctx_pld_dec + + # if ( + # not add_hooks + # and + # NamespacePath in + # ): + # pytest.fail('ctx should fail to open without custom enc_hook!?') + + await ipc.send(nsp) + nsp_rt = await ipc.receive() + + assert nsp_rt == nsp + assert nsp_rt.load_ref() is ex_func + + # this test passes bc we can go no further! + except MsgTypeError: + if not add_hooks: + # teardown nursery + await p.cancel_actor() + return + + await p.cancel_actor() + + if ( + NamespacePath in pld_types + and + add_hooks + ): + trio.run(main) + + else: + with pytest.raises( + expected_exception=tractor.RemoteActorError, + ) as excinfo: + trio.run(main) + + exc = excinfo.value + # bc `.started(nsp: NamespacePath)` will raise + assert exc.boxed_type is TypeError + + +# def chk_pld_type( +# payload_spec: Type[Struct]|Any, +# pld: Any, + +# expect_roundtrip: bool|None = None, + +# ) -> bool: + +# pld_val_type: Type = type(pld) + +# # TODO: verify that the overridden subtypes +# # DO NOT have modified type-annots from original! +# # 'Start', .pld: FuncSpec +# # 'StartAck', .pld: IpcCtxSpec +# # 'Stop', .pld: UNSEt +# # 'Error', .pld: ErrorData + +# codec: MsgCodec = mk_codec( +# # NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified +# # type union. +# ipc_pld_spec=payload_spec, +# ) + +# # make a one-off dec to compare with our `MsgCodec` instance +# # which does the below `mk_msg_spec()` call internally +# ipc_msg_spec: Union[Type[Struct]] +# msg_types: list[PayloadMsg[payload_spec]] +# ( +# ipc_msg_spec, +# msg_types, +# ) = mk_msg_spec( +# payload_type_union=payload_spec, +# ) +# _enc = msgpack.Encoder() +# _dec = msgpack.Decoder( +# type=ipc_msg_spec or Any, # like `PayloadMsg[Any]` +# ) + +# assert ( +# payload_spec +# == +# codec.pld_spec +# ) + +# # assert codec.dec == dec +# # +# # ^-XXX-^ not sure why these aren't "equal" but when cast +# # to `str` they seem to match ?? .. kk + +# assert ( +# str(ipc_msg_spec) +# == +# str(codec.msg_spec) +# == +# str(_dec.type) +# == +# str(codec.dec.type) +# ) + +# # verify the boxed-type for all variable payload-type msgs. +# if not msg_types: +# breakpoint() + +# roundtrip: bool|None = None +# pld_spec_msg_names: list[str] = [ +# td.__name__ for td in _payload_msgs +# ] +# for typedef in msg_types: + +# skip_runtime_msg: bool = typedef.__name__ not in pld_spec_msg_names +# if skip_runtime_msg: +# continue + +# pld_field = structs.fields(typedef)[1] +# assert pld_field.type is payload_spec # TODO-^ does this need to work to get all subtypes to adhere? + +# kwargs: dict[str, Any] = { +# 'cid': '666', +# 'pld': pld, +# } +# enc_msg: PayloadMsg = typedef(**kwargs) + +# _wire_bytes: bytes = _enc.encode(enc_msg) +# wire_bytes: bytes = codec.enc.encode(enc_msg) +# assert _wire_bytes == wire_bytes + +# ve: ValidationError|None = None +# try: +# dec_msg = codec.dec.decode(wire_bytes) +# _dec_msg = _dec.decode(wire_bytes) + +# # decoded msg and thus payload should be exactly same! +# assert (roundtrip := ( +# _dec_msg +# == +# dec_msg +# == +# enc_msg +# )) + +# if ( +# expect_roundtrip is not None +# and expect_roundtrip != roundtrip +# ): +# breakpoint() + +# assert ( +# pld +# == +# dec_msg.pld +# == +# enc_msg.pld +# ) +# # assert (roundtrip := (_dec_msg == enc_msg)) + +# except ValidationError as _ve: +# ve = _ve +# roundtrip: bool = False +# if pld_val_type is payload_spec: +# raise ValueError( +# 'Got `ValidationError` despite type-var match!?\n' +# f'pld_val_type: {pld_val_type}\n' +# f'payload_type: {payload_spec}\n' +# ) from ve + +# else: +# # ow we good cuz the pld spec mismatched. +# print( +# 'Got expected `ValidationError` since,\n' +# f'{pld_val_type} is not {payload_spec}\n' +# ) +# else: +# if ( +# payload_spec is not Any +# and +# pld_val_type is not payload_spec +# ): +# raise ValueError( +# 'DID NOT `ValidationError` despite expected type match!?\n' +# f'pld_val_type: {pld_val_type}\n' +# f'payload_type: {payload_spec}\n' +# ) + +# # full code decode should always be attempted! +# if roundtrip is None: +# breakpoint() + +# return roundtrip + + +# ?TODO? maybe remove since covered in the newer `test_pldrx_limiting` +# via end-2-end testing of all this? +# -[ ] IOW do we really NEED this lowlevel unit testing? +# +# def test_limit_msgspec( +# debug_mode: bool, +# ): +# ''' +# Internals unit testing to verify that type-limiting an IPC ctx's +# msg spec with `Pldrx.limit_plds()` results in various +# encapsulated `msgspec` object settings and state. + +# ''' +# async def main(): +# async with tractor.open_root_actor( +# debug_mode=debug_mode, +# ): +# # ensure we can round-trip a boxing `PayloadMsg` +# assert chk_pld_type( +# payload_spec=Any, +# pld=None, +# expect_roundtrip=True, +# ) + +# # verify that a mis-typed payload value won't decode +# assert not chk_pld_type( +# payload_spec=int, +# pld='doggy', +# ) + +# # parametrize the boxed `.pld` type as a custom-struct +# # and ensure that parametrization propagates +# # to all payload-msg-spec-able subtypes! +# class CustomPayload(Struct): +# name: str +# value: Any + +# assert not chk_pld_type( +# payload_spec=CustomPayload, +# pld='doggy', +# ) + +# assert chk_pld_type( +# payload_spec=CustomPayload, +# pld=CustomPayload(name='doggy', value='urmom') +# ) + +# # yah, we can `.pause_from_sync()` now! +# # breakpoint() + +# trio.run(main)