Get `test_codec_hooks_mod` working with `Msg`s

Though the runtime hasn't been changed over in this patch (it was in the
local index at the time however), the test does now demonstrate that
using a `Started` the correctly typed `.pld` will codec correctly when
passed manually to `MsgCodec.encode/decode()`.

Despite not having the runtime ported to the new shuttle msg set
(meaning the mentioned test will fail without the runtime port patch),
I was able to get this first original test working that limits payload
packets as a `Msg.pld: NamespacePath`this as long as we spec
`enc/dec_hook()`s then the `Msg.pld` will be processed correctly as per:
https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
in both the `Any` and `NamespacePath|None` spec cases.
^- turns out in this case -^ that the codec hooks only get invoked on
the unknown-fields NOT the entire `Struct`-msg.

A further gotcha was merging a `|None` into the `pld_spec` since this
test spawns a subactor and opens a context via `send_back_nsp()` and
that func has no explicit `return` - so of course it delivers
a `Return(pld=None)` which will fail if we only spec `NamespacePath`.
Tyler Goodlet 2024-04-02 11:14:43 -04:00
parent a315f01acc
commit 3a105e2830
1 changed files with 236 additions and 69 deletions

View File

@ -7,7 +7,6 @@ B~)
''' '''
from typing import ( from typing import (
Any, Any,
_GenericAlias,
Type, Type,
Union, Union,
) )
@ -26,20 +25,23 @@ from msgspec import (
import pytest import pytest
import tractor import tractor
from tractor.msg import ( from tractor.msg import (
_def_msgspec_codec, _codec,
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
NamespacePath, NamespacePath,
MsgCodec, MsgCodec,
mk_codec, mk_codec,
apply_codec, apply_codec,
current_msgspec_codec, current_codec,
) )
from tractor.msg import types from tractor.msg import (
types,
)
from tractor import _state
from tractor.msg.types import ( from tractor.msg.types import (
# PayloadT, # PayloadT,
Msg, Msg,
# Started, Started,
mk_msg_spec, mk_msg_spec,
) )
import trio import trio
@ -60,56 +62,110 @@ def test_msg_spec_xor_pld_spec():
) )
# TODO: wrap these into `._codec` such that user can just pass
# a type table of some sort?
def enc_hook(obj: Any) -> Any:
if isinstance(obj, NamespacePath):
return str(obj)
else:
raise NotImplementedError(
f'Objects of type {type(obj)} are not supported'
)
def dec_hook(type: Type, obj: Any) -> Any:
print(f'type is: {type}')
if type is NamespacePath:
return NamespacePath(obj)
else:
raise NotImplementedError(
f'Objects of type {type(obj)} are not supported'
)
def ex_func(*args): def ex_func(*args):
print(f'ex_func({args})') print(f'ex_func({args})')
def mk_custom_codec( def mk_custom_codec(
ipc_msg_spec: Type[Any] = Any, pld_spec: Union[Type]|Any,
) -> MsgCodec: ) -> MsgCodec:
# apply custom hooks and set a `Decoder` which only '''
# loads `NamespacePath` types. Create custom `msgpack` enc/dec-hooks and set a `Decoder`
nsp_codec: MsgCodec = mk_codec( which only loads `NamespacePath` types.
ipc_msg_spec=ipc_msg_spec,
enc_hook=enc_hook, '''
dec_hook=dec_hook, uid: tuple[str, str] = tractor.current_actor().uid
# XXX NOTE XXX: despite defining `NamespacePath` as a type
# field on our `Msg.pld`, we still need a enc/dec_hook() pair
# to cast to/from that type on the wire. See the docs:
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
def enc_nsp(obj: Any) -> Any:
match obj:
case NamespacePath():
print(
f'{uid}: `NamespacePath`-Only ENCODE?\n'
f'type: {type(obj)}\n'
f'obj: {obj}\n'
) )
# TODO: validate `MsgCodec` interface/semantics? return str(obj)
# -[ ] simple field tests to ensure caching + reset is workin?
# -[ ] custom / changing `.decoder()` calls? logmsg: str = (
f'{uid}: Encoding `{obj}: <{type(obj)}>` not supported'
f'type: {type(obj)}\n'
f'obj: {obj}\n'
)
print(logmsg)
raise NotImplementedError(logmsg)
def dec_nsp(
type: Type,
obj: Any,
) -> Any:
print(
f'{uid}: CUSTOM DECODE\n'
f'input type: {type}\n'
f'obj: {obj}\n'
f'type(obj): `{type(obj).__class__}`\n'
)
nsp = None
# This never seems to hit?
if isinstance(obj, Msg):
print(f'Msg type: {obj}')
if (
type is NamespacePath
and isinstance(obj, str)
and ':' in obj
):
nsp = NamespacePath(obj)
if nsp:
print(f'Returning NSP instance: {nsp}')
return nsp
logmsg: str = (
f'{uid}: Decoding `{obj}: <{type(obj)}>` not supported'
f'input type: {type(obj)}\n'
f'obj: {obj}\n'
f'type(obj): `{type(obj).__class__}`\n'
)
print(logmsg)
raise NotImplementedError(logmsg)
nsp_codec: MsgCodec = mk_codec(
ipc_pld_spec=pld_spec,
# NOTE XXX: the encode hook MUST be used no matter what since
# our `NamespacePath` is not any of a `Any` native type nor
# a `msgspec.Struct` subtype - so `msgspec` has no way to know
# how to encode it unless we provide the custom hook.
# #
# dec = nsp_codec.decoder( # AGAIN that is, regardless of whether we spec an
# types=NamespacePath, # `Any`-decoded-pld the enc has no knowledge (by default)
# ) # how to enc `NamespacePath` (nsp), so we add a custom
# assert nsp_codec.dec is dec # hook to do that ALWAYS.
enc_hook=enc_nsp,
# XXX NOTE: pretty sure this is mutex with the `type=` to
# `Decoder`? so it won't work in tandem with the
# `ipc_pld_spec` passed above?
dec_hook=dec_nsp,
)
return nsp_codec return nsp_codec
@tractor.context @tractor.context
async def send_back_nsp( async def send_back_nsp(
ctx: tractor.Context, ctx: Context,
expect_debug: bool,
use_any_spec: bool,
) -> None: ) -> None:
''' '''
@ -117,28 +173,65 @@ async def send_back_nsp(
and ensure we can round trip a func ref with our parent. and ensure we can round trip a func ref with our parent.
''' '''
task: trio.Task = trio.lowlevel.current_task() # debug mode sanity check
task_ctx: Context = task.context assert expect_debug == _state.debug_mode()
assert _ctxvar_MsgCodec not in task_ctx
nsp_codec: MsgCodec = mk_custom_codec() # task: trio.Task = trio.lowlevel.current_task()
# TreeVar
# curr_codec = _ctxvar_MsgCodec.get_in(task)
# ContextVar
# task_ctx: Context = task.context
# assert _ctxvar_MsgCodec not in task_ctx
curr_codec = _ctxvar_MsgCodec.get()
assert curr_codec is _codec._def_tractor_codec
if use_any_spec:
pld_spec = Any
else:
# NOTE: don't need the |None here since
# the parent side will never send `None` like
# we do here in the implicit return at the end of this
# `@context` body.
pld_spec = NamespacePath # |None
nsp_codec: MsgCodec = mk_custom_codec(
pld_spec=pld_spec,
)
with apply_codec(nsp_codec) as codec: with apply_codec(nsp_codec) as codec:
chk_codec_applied( chk_codec_applied(
custom_codec=nsp_codec, custom_codec=nsp_codec,
enter_value=codec, enter_value=codec,
) )
# ensure roundtripping works locally
nsp = NamespacePath.from_ref(ex_func) nsp = NamespacePath.from_ref(ex_func)
await ctx.started(nsp) wire_bytes: bytes = nsp_codec.encode(
Started(
cid=ctx.cid,
pld=nsp
)
)
msg: Started = nsp_codec.decode(wire_bytes)
pld = msg.pld
assert pld == nsp
await ctx.started(nsp)
async with ctx.open_stream() as ipc: async with ctx.open_stream() as ipc:
async for msg in ipc: async for msg in ipc:
if use_any_spec:
assert msg == f'{__name__}:ex_func' assert msg == f'{__name__}:ex_func'
# TODO: as per below # TODO: as per below
# assert isinstance(msg, NamespacePath) # assert isinstance(msg, NamespacePath)
assert isinstance(msg, str) assert isinstance(msg, str)
else:
assert isinstance(msg, NamespacePath)
await ipc.send(msg)
def chk_codec_applied( def chk_codec_applied(
@ -146,11 +239,20 @@ def chk_codec_applied(
enter_value: MsgCodec, enter_value: MsgCodec,
) -> MsgCodec: ) -> MsgCodec:
task: trio.Task = trio.lowlevel.current_task() # task: trio.Task = trio.lowlevel.current_task()
task_ctx: Context = task.context
assert _ctxvar_MsgCodec in task_ctx # TreeVar
curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec] # curr_codec = _ctxvar_MsgCodec.get_in(task)
# ContextVar
# task_ctx: Context = task.context
# assert _ctxvar_MsgCodec in task_ctx
# curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec]
# RunVar
curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
last_read_codec = _ctxvar_MsgCodec.get()
assert curr_codec is last_read_codec
assert ( assert (
# returned from `mk_codec()` # returned from `mk_codec()`
@ -163,14 +265,31 @@ def chk_codec_applied(
curr_codec is curr_codec is
# public API for all of the above # public API for all of the above
current_msgspec_codec() current_codec()
# the default `msgspec` settings # the default `msgspec` settings
is not _def_msgspec_codec is not _codec._def_msgspec_codec
is not _codec._def_tractor_codec
) )
def test_codec_hooks_mod(): @pytest.mark.parametrize(
'ipc_pld_spec',
[
# _codec._def_msgspec_codec,
Any,
# _codec._def_tractor_codec,
NamespacePath|None,
],
ids=[
'any_type',
'nsp_type',
]
)
def test_codec_hooks_mod(
debug_mode: bool,
ipc_pld_spec: Union[Type]|Any,
):
''' '''
Audit the `.msg.MsgCodec` override apis details given our impl Audit the `.msg.MsgCodec` override apis details given our impl
uses `contextvars` to accomplish per `trio` task codec uses `contextvars` to accomplish per `trio` task codec
@ -178,11 +297,21 @@ def test_codec_hooks_mod():
''' '''
async def main(): async def main():
task: trio.Task = trio.lowlevel.current_task()
task_ctx: Context = task.context
assert _ctxvar_MsgCodec not in task_ctx
async with tractor.open_nursery() as an: # task: trio.Task = trio.lowlevel.current_task()
# ContextVar
# task_ctx: Context = task.context
# assert _ctxvar_MsgCodec not in task_ctx
# TreeVar
# def_codec: MsgCodec = _ctxvar_MsgCodec.get_in(task)
def_codec = _ctxvar_MsgCodec.get()
assert def_codec is _codec._def_tractor_codec
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
p: tractor.Portal = await an.start_actor( p: tractor.Portal = await an.start_actor(
'sub', 'sub',
enable_modules=[__name__], enable_modules=[__name__],
@ -192,7 +321,9 @@ def test_codec_hooks_mod():
# - codec not modified -> decode nsp as `str` # - codec not modified -> decode nsp as `str`
# - codec modified with hooks -> decode nsp as # - codec modified with hooks -> decode nsp as
# `NamespacePath` # `NamespacePath`
nsp_codec: MsgCodec = mk_custom_codec() nsp_codec: MsgCodec = mk_custom_codec(
pld_spec=ipc_pld_spec,
)
with apply_codec(nsp_codec) as codec: with apply_codec(nsp_codec) as codec:
chk_codec_applied( chk_codec_applied(
custom_codec=nsp_codec, custom_codec=nsp_codec,
@ -202,9 +333,22 @@ def test_codec_hooks_mod():
async with ( async with (
p.open_context( p.open_context(
send_back_nsp, send_back_nsp,
# TODO: send the original nsp here and
# test with `limit_msg_spec()` above?
expect_debug=debug_mode,
use_any_spec=(ipc_pld_spec==Any),
) as (ctx, first), ) as (ctx, first),
ctx.open_stream() as ipc, ctx.open_stream() as ipc,
): ):
if ipc_pld_spec is NamespacePath:
assert isinstance(first, NamespacePath)
print(
'root: ENTERING CONTEXT BLOCK\n'
f'type(first): {type(first)}\n'
f'first: {first}\n'
)
# ensure codec is still applied across # ensure codec is still applied across
# `tractor.Context` + its embedded nursery. # `tractor.Context` + its embedded nursery.
chk_codec_applied( chk_codec_applied(
@ -212,23 +356,46 @@ def test_codec_hooks_mod():
enter_value=codec, enter_value=codec,
) )
assert first == f'{__name__}:ex_func' first_nsp = NamespacePath(first)
# ensure roundtripping works
wire_bytes: bytes = nsp_codec.encode(
Started(
cid=ctx.cid,
pld=first_nsp
)
)
msg: Started = nsp_codec.decode(wire_bytes)
pld = msg.pld
assert pld == first_nsp
# try a manual decode of the started msg+pld
# TODO: actually get the decoder loading # TODO: actually get the decoder loading
# to native once we spec our SCIPP msgspec # to native once we spec our SCIPP msgspec
# (structurred-conc-inter-proc-protocol) # (structurred-conc-inter-proc-protocol)
# implemented as per, # implemented as per,
# https://github.com/goodboy/tractor/issues/36 # https://github.com/goodboy/tractor/issues/36
# #
# assert isinstance(first, NamespacePath) if ipc_pld_spec is NamespacePath:
assert isinstance(first, NamespacePath)
# `Any`-payload-spec case
else:
assert isinstance(first, str) assert isinstance(first, str)
assert first == f'{__name__}:ex_func'
await ipc.send(first) await ipc.send(first)
with trio.move_on_after(1): with trio.move_on_after(.6):
async for msg in ipc: async for msg in ipc:
print(msg)
# TODO: as per above # TODO: as per above
# assert isinstance(msg, NamespacePath) # assert isinstance(msg, NamespacePath)
assert isinstance(msg, str) assert isinstance(msg, str)
await ipc.send(msg)
await trio.sleep(0.1)
await p.cancel_actor() await p.cancel_actor()