diff --git a/tests/test_caps_based_msging.py b/tests/test_caps_based_msging.py index b42d9e3..acc1f30 100644 --- a/tests/test_caps_based_msging.py +++ b/tests/test_caps_based_msging.py @@ -5,6 +5,7 @@ Low-level functional audits for our B~) ''' +import typing from typing import ( Any, Type, @@ -23,7 +24,9 @@ from msgspec import ( ValidationError, ) import pytest + import tractor +from tractor import _state from tractor.msg import ( _codec, _ctxvar_MsgCodec, @@ -34,12 +37,9 @@ from tractor.msg import ( apply_codec, current_codec, ) -from tractor.msg import ( - types, -) -from tractor import _state from tractor.msg.types import ( - # PayloadT, + _payload_msgs, + log, Msg, Started, mk_msg_spec, @@ -62,17 +62,14 @@ def test_msg_spec_xor_pld_spec(): ) -def ex_func(*args): - print(f'ex_func({args})') - - def mk_custom_codec( pld_spec: Union[Type]|Any, + add_hooks: bool, ) -> MsgCodec: ''' Create custom `msgpack` enc/dec-hooks and set a `Decoder` - which only loads `NamespacePath` types. + which only loads `pld_spec` (like `NamespacePath`) types. ''' uid: tuple[str, str] = tractor.current_actor().uid @@ -83,61 +80,75 @@ def mk_custom_codec( # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types def enc_nsp(obj: Any) -> Any: + print(f'{uid} ENC HOOK') match obj: case NamespacePath(): print( f'{uid}: `NamespacePath`-Only ENCODE?\n' - f'type: {type(obj)}\n' - f'obj: {obj}\n' + f'obj-> `{obj}`: {type(obj)}\n' ) - + # if type(obj) != NamespacePath: + # breakpoint() return str(obj) - logmsg: str = ( - f'{uid}: Encoding `{obj}: <{type(obj)}>` not supported' - f'type: {type(obj)}\n' - f'obj: {obj}\n' + print( + f'{uid}\n' + 'CUSTOM ENCODE\n' + f'obj-arg-> `{obj}`: {type(obj)}\n' + ) + logmsg: str = ( + f'{uid}\n' + 'FAILED ENCODE\n' + f'obj-> `{obj}: {type(obj)}`\n' ) - print(logmsg) raise NotImplementedError(logmsg) def dec_nsp( - type: Type, + obj_type: Type, obj: Any, ) -> Any: print( - f'{uid}: CUSTOM DECODE\n' - f'input type: {type}\n' - f'obj: {obj}\n' - f'type(obj): `{type(obj).__class__}`\n' + f'{uid}\n' + 'CUSTOM DECODE\n' + f'type-arg-> {obj_type}\n' + f'obj-arg-> `{obj}`: {type(obj)}\n' ) nsp = None - # This never seems to hit? - if isinstance(obj, Msg): - print(f'Msg type: {obj}') - if ( - type is NamespacePath + obj_type is NamespacePath and isinstance(obj, str) and ':' in obj ): nsp = NamespacePath(obj) + # TODO: we could built a generic handler using + # JUST matching the obj_type part? + # nsp = obj_type(obj) if nsp: print(f'Returning NSP instance: {nsp}') return nsp logmsg: str = ( - f'{uid}: Decoding `{obj}: <{type(obj)}>` not supported' - f'input type: {type(obj)}\n' - f'obj: {obj}\n' - f'type(obj): `{type(obj).__class__}`\n' + f'{uid}\n' + 'FAILED DECODE\n' + f'type-> {obj_type}\n' + f'obj-arg-> `{obj}`: {type(obj)}\n' ) - print(logmsg) - raise NotImplementedError(logmsg) - + # TODO: figure out the ignore subsys for this! + # -[ ] option whether to defense-relay backc the msg + # inside an `Invalid`/`Ignore` + # -[ ] how to make this handling pluggable such that a + # `Channel`/`MsgTransport` can intercept and process + # back msgs either via exception handling or some other + # signal? + log.warning(logmsg) + # NOTE: this delivers the invalid + # value up to `msgspec`'s decoding + # machinery for error raising. + return obj + # raise NotImplementedError(logmsg) nsp_codec: MsgCodec = mk_codec( ipc_pld_spec=pld_spec, @@ -151,97 +162,32 @@ def mk_custom_codec( # `Any`-decoded-pld the enc has no knowledge (by default) # how to enc `NamespacePath` (nsp), so we add a custom # hook to do that ALWAYS. - enc_hook=enc_nsp, + enc_hook=enc_nsp if add_hooks else None, # XXX NOTE: pretty sure this is mutex with the `type=` to # `Decoder`? so it won't work in tandem with the # `ipc_pld_spec` passed above? - dec_hook=dec_nsp, + dec_hook=dec_nsp if add_hooks else None, ) return nsp_codec -@tractor.context -async def send_back_nsp( - ctx: Context, - expect_debug: bool, - use_any_spec: bool, - -) -> None: - ''' - Setup up a custom codec to load instances of `NamespacePath` - and ensure we can round trip a func ref with our parent. - - ''' - # debug mode sanity check - assert expect_debug == _state.debug_mode() - - # task: trio.Task = trio.lowlevel.current_task() - - # TreeVar - # curr_codec = _ctxvar_MsgCodec.get_in(task) - - # ContextVar - # task_ctx: Context = task.context - # assert _ctxvar_MsgCodec not in task_ctx - - curr_codec = _ctxvar_MsgCodec.get() - assert curr_codec is _codec._def_tractor_codec - - if use_any_spec: - pld_spec = Any - else: - # NOTE: don't need the |None here since - # the parent side will never send `None` like - # we do here in the implicit return at the end of this - # `@context` body. - pld_spec = NamespacePath # |None - - nsp_codec: MsgCodec = mk_custom_codec( - pld_spec=pld_spec, - ) - with apply_codec(nsp_codec) as codec: - chk_codec_applied( - custom_codec=nsp_codec, - enter_value=codec, - ) - - # ensure roundtripping works locally - nsp = NamespacePath.from_ref(ex_func) - wire_bytes: bytes = nsp_codec.encode( - Started( - cid=ctx.cid, - pld=nsp - ) - ) - msg: Started = nsp_codec.decode(wire_bytes) - pld = msg.pld - assert pld == nsp - - await ctx.started(nsp) - async with ctx.open_stream() as ipc: - async for msg in ipc: - - if use_any_spec: - assert msg == f'{__name__}:ex_func' - - # TODO: as per below - # assert isinstance(msg, NamespacePath) - assert isinstance(msg, str) - else: - assert isinstance(msg, NamespacePath) - - await ipc.send(msg) - - def chk_codec_applied( - custom_codec: MsgCodec, - enter_value: MsgCodec, + expect_codec: MsgCodec, + enter_value: MsgCodec|None = None, + ) -> MsgCodec: + ''' + buncha sanity checks ensuring that the IPC channel's + context-vars are set to the expected codec and that are + ctx-var wrapper APIs match the same. - # task: trio.Task = trio.lowlevel.current_task() - + ''' + # TODO: play with tricyle again, bc this is supposed to work + # the way we want? + # # TreeVar + # task: trio.Task = trio.lowlevel.current_task() # curr_codec = _ctxvar_MsgCodec.get_in(task) # ContextVar @@ -249,46 +195,358 @@ def chk_codec_applied( # assert _ctxvar_MsgCodec in task_ctx # curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec] + # NOTE: currently we use this! # RunVar - curr_codec: MsgCodec = _ctxvar_MsgCodec.get() + curr_codec: MsgCodec = current_codec() last_read_codec = _ctxvar_MsgCodec.get() - assert curr_codec is last_read_codec + # assert curr_codec is last_read_codec assert ( + (same_codec := expect_codec) is # returned from `mk_codec()` - custom_codec is # yielded value from `apply_codec()` - enter_value is # read from current task's `contextvars.Context` curr_codec is - - # public API for all of the above - current_codec() + last_read_codec # the default `msgspec` settings is not _codec._def_msgspec_codec is not _codec._def_tractor_codec ) + if enter_value: + enter_value is same_codec + + +def iter_maybe_sends( + send_items: dict[Union[Type], Any] | list[tuple], + ipc_pld_spec: Union[Type] | Any, + add_codec_hooks: bool, + + codec: MsgCodec|None = None, + +) -> tuple[Any, bool]: + + if isinstance(send_items, dict): + send_items = send_items.items() + + for ( + send_type_spec, + send_value, + ) in send_items: + + expect_roundtrip: bool = False + + # values-to-typespec santiy + send_type = type(send_value) + assert send_type == send_type_spec or ( + (subtypes := getattr(send_type_spec, '__args__', None)) + and send_type in subtypes + ) + + spec_subtypes: set[Union[Type]] = ( + getattr( + ipc_pld_spec, + '__args__', + {ipc_pld_spec,}, + ) + ) + send_in_spec: bool = ( + send_type == ipc_pld_spec + or ( + ipc_pld_spec != Any + and # presume `Union` of types + send_type in spec_subtypes + ) + or ( + ipc_pld_spec == Any + and + send_type != NamespacePath + ) + ) + expect_roundtrip = ( + send_in_spec + # any spec should support all other + # builtin py values that we send + # except our custom nsp type which + # we should be able to send as long + # as we provide the custom codec hooks. + or ( + ipc_pld_spec == Any + and + send_type == NamespacePath + and + add_codec_hooks + ) + ) + + if codec is not None: + # XXX FIRST XXX ensure roundtripping works + # before touching any IPC primitives/APIs. + wire_bytes: bytes = codec.encode( + Started( + cid='blahblah', + pld=send_value, + ) + ) + # NOTE: demonstrates the decoder loading + # to via our native SCIPP msg-spec + # (structurred-conc-inter-proc-protocol) + # implemented as per, + try: + msg: Started = codec.decode(wire_bytes) + if not expect_roundtrip: + pytest.fail( + f'NOT-EXPECTED able to roundtrip value given spec:\n' + f'ipc_pld_spec -> {ipc_pld_spec}\n' + f'value -> {send_value}: {send_type}\n' + ) + + pld = msg.pld + assert pld == send_value + + except ValidationError: + if expect_roundtrip: + pytest.fail( + f'EXPECTED to roundtrip value given spec:\n' + f'ipc_pld_spec -> {ipc_pld_spec}\n' + f'value -> {send_value}: {send_type}\n' + ) + + yield ( + str(send_type), + send_value, + expect_roundtrip, + ) + + +def dec_type_union( + type_names: list[str], +) -> Type: + ''' + Look up types by name, compile into a list and then create and + return a `typing.Union` from the full set. + + ''' + import importlib + types: list[Type] = [] + for type_name in type_names: + for ns in [ + typing, + importlib.import_module(__name__), + ]: + if type_ref := getattr( + ns, + type_name, + False, + ): + types.append(type_ref) + + # special case handling only.. + # ipc_pld_spec: Union[Type] = eval( + # pld_spec_str, + # {}, # globals + # {'typing': typing}, # locals + # ) + + return Union[*types] + + +def enc_type_union( + union_or_type: Union[Type]|Type, +) -> list[str]: + ''' + Encode a type-union or single type to a list of type-name-strings + ready for IPC interchange. + + ''' + type_strs: list[str] = [] + for typ in getattr( + union_or_type, + '__args__', + {union_or_type,}, + ): + type_strs.append(typ.__qualname__) + + return type_strs + + +@tractor.context +async def send_back_nsp( + ctx: Context, + expect_debug: bool, + pld_spec_type_strs: list[str], + add_hooks: bool, + started_msg_bytes: bytes, + expect_ipc_send: dict[str, tuple[Any, bool]], + +) -> None: + ''' + Setup up a custom codec to load instances of `NamespacePath` + and ensure we can round trip a func ref with our parent. + + ''' + # debug mode sanity check (prolly superfluous but, meh) + assert expect_debug == _state.debug_mode() + + # init state in sub-actor should be default + chk_codec_applied( + expect_codec=_codec._def_tractor_codec, + ) + + # load pld spec from input str + ipc_pld_spec = dec_type_union( + pld_spec_type_strs, + ) + pld_spec_str = str(ipc_pld_spec) + + # same as on parent side config. + nsp_codec: MsgCodec = mk_custom_codec( + pld_spec=ipc_pld_spec, + add_hooks=add_hooks, + ) + with apply_codec(nsp_codec) as codec: + chk_codec_applied( + expect_codec=nsp_codec, + enter_value=codec, + ) + + print( + 'CHILD attempting `Started`-bytes DECODE..\n' + ) + try: + msg: Started = nsp_codec.decode(started_msg_bytes) + expected_pld_spec_str: str = msg.pld + assert pld_spec_str == expected_pld_spec_str + + # TODO: maybe we should add our own wrapper error so as to + # be interchange-lib agnostic? + # -[ ] the error type is wtv is raised from the hook so we + # could also require a type-class of errors for + # indicating whether the hook-failure can be handled by + # a nasty-dialog-unprot sub-sys? + except ValidationError: + + # NOTE: only in the `Any` spec case do we expect this to + # work since otherwise no spec covers a plain-ol' + # `.pld: str` + if pld_spec_str == 'Any': + raise + else: + print( + 'CHILD (correctly) unable to DECODE `Started`-bytes\n' + f'{started_msg_bytes}\n' + ) + + iter_send_val_items = iter(expect_ipc_send.values()) + sent: list[Any] = [] + for send_value, expect_send in iter_send_val_items: + try: + print( + f'CHILD attempting to `.started({send_value})`\n' + f'=> expect_send: {expect_send}\n' + f'SINCE, ipc_pld_spec: {ipc_pld_spec}\n' + f'AND, codec: {codec}\n' + ) + await ctx.started(send_value) + sent.append(send_value) + if not expect_send: + + # XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL + # `str` handling! or special debug mode IPC + # msgs! + # await tractor.pause() + + raise RuntimeError( + # pytest.fail( + f'NOT-EXPECTED able to roundtrip value given spec:\n' + f'ipc_pld_spec -> {ipc_pld_spec}\n' + f'value -> {send_value}: {type(send_value)}\n' + ) + + break # move on to streaming block.. + + except NotImplementedError: + print('FAILED ENCODE!') + + except tractor.MsgTypeError: + # await tractor.pause() + if expect_send: + pytest.fail( + f'EXPECTED to `.started()` value given spec:\n' + f'ipc_pld_spec -> {ipc_pld_spec}\n' + f'value -> {send_value}: {type(send_value)}\n' + ) + + async with ctx.open_stream() as ipc: + for send_value, expect_send in iter_send_val_items: + send_type: Type = type(send_value) + print( + 'CHILD report on send value\n' + f'ipc_pld_spec: {ipc_pld_spec}\n' + f'expect_send: {expect_send}\n' + f'val: {send_value}\n' + ) + try: + await ipc.send(send_value) + sent.append(send_value) + if not expect_send: + pytest.fail( + f'NOT-EXPECTED able to roundtrip value given spec:\n' + f'ipc_pld_spec -> {ipc_pld_spec}\n' + f'value -> {send_value}: {send_type}\n' + ) + except ValidationError: + if expect_send: + pytest.fail( + f'EXPECTED to roundtrip value given spec:\n' + f'ipc_pld_spec -> {ipc_pld_spec}\n' + f'value -> {send_value}: {send_type}\n' + ) + continue + + assert ( + len(sent) + == + len([val + for val, expect in + expect_ipc_send.values() + if expect is True]) + ) + + +def ex_func(*args): + print(f'ex_func({args})') + @pytest.mark.parametrize( 'ipc_pld_spec', [ - # _codec._def_msgspec_codec, Any, - # _codec._def_tractor_codec, - NamespacePath|None, + NamespacePath, + NamespacePath|None, # the "maybe" spec Bo ], ids=[ 'any_type', 'nsp_type', + 'maybe_nsp_type', ] ) +@pytest.mark.parametrize( + 'add_codec_hooks', + [ + True, + False, + ], + ids=['use_codec_hooks', 'no_codec_hooks'], +) def test_codec_hooks_mod( debug_mode: bool, ipc_pld_spec: Union[Type]|Any, + # send_value: None|str|NamespacePath, + add_codec_hooks: bool, ): ''' Audit the `.msg.MsgCodec` override apis details given our impl @@ -297,17 +555,17 @@ def test_codec_hooks_mod( ''' async def main(): + nsp = NamespacePath.from_ref(ex_func) + send_items: dict[Union, Any] = { + Union[None]: None, + Union[NamespacePath]: nsp, + Union[str]: str(nsp), + } - # task: trio.Task = trio.lowlevel.current_task() - - # ContextVar - # task_ctx: Context = task.context - # assert _ctxvar_MsgCodec not in task_ctx - - # TreeVar - # def_codec: MsgCodec = _ctxvar_MsgCodec.get_in(task) - def_codec = _ctxvar_MsgCodec.get() - assert def_codec is _codec._def_tractor_codec + # init default state for actor + chk_codec_applied( + expect_codec=_codec._def_tractor_codec, + ) async with tractor.open_nursery( debug_mode=debug_mode, @@ -323,79 +581,97 @@ def test_codec_hooks_mod( # `NamespacePath` nsp_codec: MsgCodec = mk_custom_codec( pld_spec=ipc_pld_spec, + add_hooks=add_codec_hooks, ) with apply_codec(nsp_codec) as codec: chk_codec_applied( - custom_codec=nsp_codec, + expect_codec=nsp_codec, enter_value=codec, ) + expect_ipc_send: dict[str, tuple[Any, bool]] = {} + + report: str = ( + 'Parent report on send values with\n' + f'ipc_pld_spec: {ipc_pld_spec}\n' + ' ------ - ------\n' + ) + for val_type_str, val, expect_send in iter_maybe_sends( + send_items, + ipc_pld_spec, + add_codec_hooks=add_codec_hooks, + ): + report += ( + f'send_value: {val}: {type(val)} ' + f'=> expect_send: {expect_send}\n' + ) + expect_ipc_send[val_type_str] = (val, expect_send) + + print( + report + + ' ------ - ------\n' + ) + assert len(expect_ipc_send) == len(send_items) + # now try over real IPC with a the subactor + # expect_ipc_rountrip: bool = True + expected_started = Started( + cid='cid', + pld=str(ipc_pld_spec), + ) + # build list of values we expect to receive from + # the subactor. + expect_to_send: list[Any] = [ + val + for val, expect_send in expect_ipc_send.values() + if expect_send + ] + + pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec) + + # TODO: send the original nsp here and + # test with `limit_msg_spec()` above? + # await tractor.pause() + print('PARENT opening IPC ctx!\n') async with ( + p.open_context( send_back_nsp, - # TODO: send the original nsp here and - # test with `limit_msg_spec()` above? expect_debug=debug_mode, - use_any_spec=(ipc_pld_spec==Any), - + pld_spec_type_strs=pld_spec_type_strs, + add_hooks=add_codec_hooks, + started_msg_bytes=nsp_codec.encode(expected_started), + expect_ipc_send=expect_ipc_send, ) as (ctx, first), + ctx.open_stream() as ipc, ): - if ipc_pld_spec is NamespacePath: - assert isinstance(first, NamespacePath) - + # ensure codec is still applied across + # `tractor.Context` + its embedded nursery. + chk_codec_applied( + expect_codec=nsp_codec, + enter_value=codec, + ) print( 'root: ENTERING CONTEXT BLOCK\n' f'type(first): {type(first)}\n' f'first: {first}\n' ) - # ensure codec is still applied across - # `tractor.Context` + its embedded nursery. - chk_codec_applied( - custom_codec=nsp_codec, - enter_value=codec, - ) + expect_to_send.remove(first) - first_nsp = NamespacePath(first) + # TODO: explicit values we expect depending on + # codec config! + # assert first == first_val + # assert first == f'{__name__}:ex_func' - # ensure roundtripping works - wire_bytes: bytes = nsp_codec.encode( - Started( - cid=ctx.cid, - pld=first_nsp + async for next_sent in ipc: + print( + 'Child sent next value\n' + f'{next_sent}: {type(next_sent)}\n' ) - ) - msg: Started = nsp_codec.decode(wire_bytes) - pld = msg.pld - assert pld == first_nsp + expect_to_send.remove(next_sent) - # try a manual decode of the started msg+pld - - # TODO: actually get the decoder loading - # to native once we spec our SCIPP msgspec - # (structurred-conc-inter-proc-protocol) - # implemented as per, - # https://github.com/goodboy/tractor/issues/36 - # - if ipc_pld_spec is NamespacePath: - assert isinstance(first, NamespacePath) - - # `Any`-payload-spec case - else: - assert isinstance(first, str) - assert first == f'{__name__}:ex_func' - - await ipc.send(first) - - with trio.move_on_after(.6): - async for msg in ipc: - print(msg) - - # TODO: as per above - # assert isinstance(msg, NamespacePath) - assert isinstance(msg, str) - await ipc.send(msg) - await trio.sleep(0.1) + # all sent values should have arrived! + assert not expect_to_send await p.cancel_actor() @@ -467,7 +743,7 @@ def chk_pld_type( roundtrip: bool|None = None pld_spec_msg_names: list[str] = [ - td.__name__ for td in types._payload_spec_msgs + td.__name__ for td in _payload_msgs ] for typedef in msg_types: