forked from goodboy/tractor
1
0
Fork 0
tractor/tests/test_caps_based_msging.py

607 lines
16 KiB
Python

'''
Low-level functional audits for our
"capability based messaging"-spec feats.
B~)
'''
from typing import (
Any,
Type,
Union,
)
from contextvars import (
Context,
)
# from inspect import Parameter
from msgspec import (
structs,
msgpack,
# defstruct,
Struct,
ValidationError,
)
import pytest
import tractor
from tractor.msg import (
_codec,
_ctxvar_MsgCodec,
NamespacePath,
MsgCodec,
mk_codec,
apply_codec,
current_codec,
)
from tractor.msg import (
types,
)
from tractor import _state
from tractor.msg.types import (
# PayloadT,
Msg,
Started,
mk_msg_spec,
)
import trio
def test_msg_spec_xor_pld_spec():
'''
If the `.msg.types.Msg`-set is overridden, we
can't also support a `Msg.pld` spec.
'''
# apply custom hooks and set a `Decoder` which only
# loads `NamespacePath` types.
with pytest.raises(RuntimeError):
mk_codec(
ipc_msg_spec=Any,
ipc_pld_spec=NamespacePath,
)
def ex_func(*args):
print(f'ex_func({args})')
def mk_custom_codec(
pld_spec: Union[Type]|Any,
) -> MsgCodec:
'''
Create custom `msgpack` enc/dec-hooks and set a `Decoder`
which only loads `NamespacePath` types.
'''
uid: tuple[str, str] = tractor.current_actor().uid
# XXX NOTE XXX: despite defining `NamespacePath` as a type
# field on our `Msg.pld`, we still need a enc/dec_hook() pair
# to cast to/from that type on the wire. See the docs:
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
def enc_nsp(obj: Any) -> Any:
match obj:
case NamespacePath():
print(
f'{uid}: `NamespacePath`-Only ENCODE?\n'
f'type: {type(obj)}\n'
f'obj: {obj}\n'
)
return str(obj)
logmsg: str = (
f'{uid}: Encoding `{obj}: <{type(obj)}>` not supported'
f'type: {type(obj)}\n'
f'obj: {obj}\n'
)
print(logmsg)
raise NotImplementedError(logmsg)
def dec_nsp(
type: Type,
obj: Any,
) -> Any:
print(
f'{uid}: CUSTOM DECODE\n'
f'input type: {type}\n'
f'obj: {obj}\n'
f'type(obj): `{type(obj).__class__}`\n'
)
nsp = None
# This never seems to hit?
if isinstance(obj, Msg):
print(f'Msg type: {obj}')
if (
type is NamespacePath
and isinstance(obj, str)
and ':' in obj
):
nsp = NamespacePath(obj)
if nsp:
print(f'Returning NSP instance: {nsp}')
return nsp
logmsg: str = (
f'{uid}: Decoding `{obj}: <{type(obj)}>` not supported'
f'input type: {type(obj)}\n'
f'obj: {obj}\n'
f'type(obj): `{type(obj).__class__}`\n'
)
print(logmsg)
raise NotImplementedError(logmsg)
nsp_codec: MsgCodec = mk_codec(
ipc_pld_spec=pld_spec,
# NOTE XXX: the encode hook MUST be used no matter what since
# our `NamespacePath` is not any of a `Any` native type nor
# a `msgspec.Struct` subtype - so `msgspec` has no way to know
# how to encode it unless we provide the custom hook.
#
# AGAIN that is, regardless of whether we spec an
# `Any`-decoded-pld the enc has no knowledge (by default)
# how to enc `NamespacePath` (nsp), so we add a custom
# hook to do that ALWAYS.
enc_hook=enc_nsp,
# XXX NOTE: pretty sure this is mutex with the `type=` to
# `Decoder`? so it won't work in tandem with the
# `ipc_pld_spec` passed above?
dec_hook=dec_nsp,
)
return nsp_codec
@tractor.context
async def send_back_nsp(
ctx: Context,
expect_debug: bool,
use_any_spec: bool,
) -> None:
'''
Setup up a custom codec to load instances of `NamespacePath`
and ensure we can round trip a func ref with our parent.
'''
# debug mode sanity check
assert expect_debug == _state.debug_mode()
# task: trio.Task = trio.lowlevel.current_task()
# TreeVar
# curr_codec = _ctxvar_MsgCodec.get_in(task)
# ContextVar
# task_ctx: Context = task.context
# assert _ctxvar_MsgCodec not in task_ctx
curr_codec = _ctxvar_MsgCodec.get()
assert curr_codec is _codec._def_tractor_codec
if use_any_spec:
pld_spec = Any
else:
# NOTE: don't need the |None here since
# the parent side will never send `None` like
# we do here in the implicit return at the end of this
# `@context` body.
pld_spec = NamespacePath # |None
nsp_codec: MsgCodec = mk_custom_codec(
pld_spec=pld_spec,
)
with apply_codec(nsp_codec) as codec:
chk_codec_applied(
custom_codec=nsp_codec,
enter_value=codec,
)
# ensure roundtripping works locally
nsp = NamespacePath.from_ref(ex_func)
wire_bytes: bytes = nsp_codec.encode(
Started(
cid=ctx.cid,
pld=nsp
)
)
msg: Started = nsp_codec.decode(wire_bytes)
pld = msg.pld
assert pld == nsp
await ctx.started(nsp)
async with ctx.open_stream() as ipc:
async for msg in ipc:
if use_any_spec:
assert msg == f'{__name__}:ex_func'
# TODO: as per below
# assert isinstance(msg, NamespacePath)
assert isinstance(msg, str)
else:
assert isinstance(msg, NamespacePath)
await ipc.send(msg)
def chk_codec_applied(
custom_codec: MsgCodec,
enter_value: MsgCodec,
) -> MsgCodec:
# task: trio.Task = trio.lowlevel.current_task()
# TreeVar
# curr_codec = _ctxvar_MsgCodec.get_in(task)
# ContextVar
# task_ctx: Context = task.context
# assert _ctxvar_MsgCodec in task_ctx
# curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec]
# RunVar
curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
last_read_codec = _ctxvar_MsgCodec.get()
assert curr_codec is last_read_codec
assert (
# returned from `mk_codec()`
custom_codec is
# yielded value from `apply_codec()`
enter_value is
# read from current task's `contextvars.Context`
curr_codec is
# public API for all of the above
current_codec()
# the default `msgspec` settings
is not _codec._def_msgspec_codec
is not _codec._def_tractor_codec
)
@pytest.mark.parametrize(
'ipc_pld_spec',
[
# _codec._def_msgspec_codec,
Any,
# _codec._def_tractor_codec,
NamespacePath|None,
],
ids=[
'any_type',
'nsp_type',
]
)
def test_codec_hooks_mod(
debug_mode: bool,
ipc_pld_spec: Union[Type]|Any,
):
'''
Audit the `.msg.MsgCodec` override apis details given our impl
uses `contextvars` to accomplish per `trio` task codec
application around an inter-proc-task-comms context.
'''
async def main():
# task: trio.Task = trio.lowlevel.current_task()
# ContextVar
# task_ctx: Context = task.context
# assert _ctxvar_MsgCodec not in task_ctx
# TreeVar
# def_codec: MsgCodec = _ctxvar_MsgCodec.get_in(task)
def_codec = _ctxvar_MsgCodec.get()
assert def_codec is _codec._def_tractor_codec
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
p: tractor.Portal = await an.start_actor(
'sub',
enable_modules=[__name__],
)
# TODO: 2 cases:
# - codec not modified -> decode nsp as `str`
# - codec modified with hooks -> decode nsp as
# `NamespacePath`
nsp_codec: MsgCodec = mk_custom_codec(
pld_spec=ipc_pld_spec,
)
with apply_codec(nsp_codec) as codec:
chk_codec_applied(
custom_codec=nsp_codec,
enter_value=codec,
)
async with (
p.open_context(
send_back_nsp,
# TODO: send the original nsp here and
# test with `limit_msg_spec()` above?
expect_debug=debug_mode,
use_any_spec=(ipc_pld_spec==Any),
) as (ctx, first),
ctx.open_stream() as ipc,
):
if ipc_pld_spec is NamespacePath:
assert isinstance(first, NamespacePath)
print(
'root: ENTERING CONTEXT BLOCK\n'
f'type(first): {type(first)}\n'
f'first: {first}\n'
)
# ensure codec is still applied across
# `tractor.Context` + its embedded nursery.
chk_codec_applied(
custom_codec=nsp_codec,
enter_value=codec,
)
first_nsp = NamespacePath(first)
# ensure roundtripping works
wire_bytes: bytes = nsp_codec.encode(
Started(
cid=ctx.cid,
pld=first_nsp
)
)
msg: Started = nsp_codec.decode(wire_bytes)
pld = msg.pld
assert pld == first_nsp
# try a manual decode of the started msg+pld
# TODO: actually get the decoder loading
# to native once we spec our SCIPP msgspec
# (structurred-conc-inter-proc-protocol)
# implemented as per,
# https://github.com/goodboy/tractor/issues/36
#
if ipc_pld_spec is NamespacePath:
assert isinstance(first, NamespacePath)
# `Any`-payload-spec case
else:
assert isinstance(first, str)
assert first == f'{__name__}:ex_func'
await ipc.send(first)
with trio.move_on_after(.6):
async for msg in ipc:
print(msg)
# TODO: as per above
# assert isinstance(msg, NamespacePath)
assert isinstance(msg, str)
await ipc.send(msg)
await trio.sleep(0.1)
await p.cancel_actor()
trio.run(main)
def chk_pld_type(
payload_spec: Type[Struct]|Any,
pld: Any,
expect_roundtrip: bool|None = None,
) -> bool:
pld_val_type: Type = type(pld)
# TODO: verify that the overridden subtypes
# DO NOT have modified type-annots from original!
# 'Start', .pld: FuncSpec
# 'StartAck', .pld: IpcCtxSpec
# 'Stop', .pld: UNSEt
# 'Error', .pld: ErrorData
codec: MsgCodec = mk_codec(
# NOTE: this ONLY accepts `Msg.pld` fields of a specified
# type union.
ipc_pld_spec=payload_spec,
)
# make a one-off dec to compare with our `MsgCodec` instance
# which does the below `mk_msg_spec()` call internally
ipc_msg_spec: Union[Type[Struct]]
msg_types: list[Msg[payload_spec]]
(
ipc_msg_spec,
msg_types,
) = mk_msg_spec(
payload_type_union=payload_spec,
)
_enc = msgpack.Encoder()
_dec = msgpack.Decoder(
type=ipc_msg_spec or Any, # like `Msg[Any]`
)
assert (
payload_spec
==
codec.pld_spec
)
# assert codec.dec == dec
#
# ^-XXX-^ not sure why these aren't "equal" but when cast
# to `str` they seem to match ?? .. kk
assert (
str(ipc_msg_spec)
==
str(codec.msg_spec)
==
str(_dec.type)
==
str(codec.dec.type)
)
# verify the boxed-type for all variable payload-type msgs.
if not msg_types:
breakpoint()
roundtrip: bool|None = None
pld_spec_msg_names: list[str] = [
td.__name__ for td in types._payload_spec_msgs
]
for typedef in msg_types:
skip_runtime_msg: bool = typedef.__name__ not in pld_spec_msg_names
if skip_runtime_msg:
continue
pld_field = structs.fields(typedef)[1]
assert pld_field.type is payload_spec # TODO-^ does this need to work to get all subtypes to adhere?
kwargs: dict[str, Any] = {
'cid': '666',
'pld': pld,
}
enc_msg: Msg = typedef(**kwargs)
_wire_bytes: bytes = _enc.encode(enc_msg)
wire_bytes: bytes = codec.enc.encode(enc_msg)
assert _wire_bytes == wire_bytes
ve: ValidationError|None = None
try:
dec_msg = codec.dec.decode(wire_bytes)
_dec_msg = _dec.decode(wire_bytes)
# decoded msg and thus payload should be exactly same!
assert (roundtrip := (
_dec_msg
==
dec_msg
==
enc_msg
))
if (
expect_roundtrip is not None
and expect_roundtrip != roundtrip
):
breakpoint()
assert (
pld
==
dec_msg.pld
==
enc_msg.pld
)
# assert (roundtrip := (_dec_msg == enc_msg))
except ValidationError as _ve:
ve = _ve
roundtrip: bool = False
if pld_val_type is payload_spec:
raise ValueError(
'Got `ValidationError` despite type-var match!?\n'
f'pld_val_type: {pld_val_type}\n'
f'payload_type: {payload_spec}\n'
) from ve
else:
# ow we good cuz the pld spec mismatched.
print(
'Got expected `ValidationError` since,\n'
f'{pld_val_type} is not {payload_spec}\n'
)
else:
if (
payload_spec is not Any
and
pld_val_type is not payload_spec
):
raise ValueError(
'DID NOT `ValidationError` despite expected type match!?\n'
f'pld_val_type: {pld_val_type}\n'
f'payload_type: {payload_spec}\n'
)
# full code decode should always be attempted!
if roundtrip is None:
breakpoint()
return roundtrip
def test_limit_msgspec():
async def main():
async with tractor.open_root_actor(
debug_mode=True
):
# ensure we can round-trip a boxing `Msg`
assert chk_pld_type(
# Msg,
Any,
None,
expect_roundtrip=True,
)
# TODO: don't need this any more right since
# `msgspec>=0.15` has the nice generics stuff yah??
#
# manually override the type annot of the payload
# field and ensure it propagates to all msg-subtypes.
# Msg.__annotations__['pld'] = Any
# verify that a mis-typed payload value won't decode
assert not chk_pld_type(
# Msg,
int,
pld='doggy',
)
# parametrize the boxed `.pld` type as a custom-struct
# and ensure that parametrization propagates
# to all payload-msg-spec-able subtypes!
class CustomPayload(Struct):
name: str
value: Any
assert not chk_pld_type(
# Msg,
CustomPayload,
pld='doggy',
)
assert chk_pld_type(
# Msg,
CustomPayload,
pld=CustomPayload(name='doggy', value='urmom')
)
# uhh bc we can `.pause_from_sync()` now! :surfer:
# breakpoint()
trio.run(main)