forked from goodboy/tractor
Extend codec test to for msg-spec parameterizing
Set a diff `Msg.pld` spec per test and then send multiple types to a child actor making sure the child can only send certain types over a stream and fails with validation or decode errors ow. The test is also param-ed both with and without hooks demonstrating how a custom type, `NamespacePath`, needs them for effective use. The subactor IPC context child is passed a `expect_ipc_send: dict` which relays the values along with their expected `.send()`-ability. Deats on technical refinements: ------ - ------ - added a `iter_maybe_sends()` send-value-as-msg-auditor and predicate generator (literally) so as to be able to pre-determine if given the current codec and `send_values` which values are expected to be IPC transmittable. - as per ^, the diff value-msgs are first round-tripped inside a `Started` msg using the configured codec in the parent/root actor before bothering with using IPC primitives + a subactor; this is how the `expect_ipc_send` table is generated initially. - for serializing the specs (`Union[Type]`s as required by `msgspec`), added a pair of codec hooks: `enc/dec_type_union()` (that ideally we move into a `.msg` submod eventually) which code the type-values as a `list[str]` of names. - the `dec_` hook had to be modified to NOT raise an error when an invalid/unhandled value arrives, this is because we do NOT want the RPC msg handling loop to raise on the `async for msg in chan:` and instead prefer to ignore and warn (for now, but eventually respond with error msg - see notes in hook body) these msgs when sent during a streaming phase; `Context.started()` will however error on a bad input for the current msg-spec since it is part of the "cheap" dialog (again see notes in `._context`) wherein the `Started` msg is always roundtripped prior to `Channel.send()` to guarantee the child adheres to its own spec. - tossed in lotsa `print()`s for console groking of the run progress. Further notes on typed-msging breaking cancellation: ------ - ------ - turns out since the runtime's cancellation implementation, being done with `Actor.cancel()` methods and friends will actually break when a stringent spec is applied (eg. a single type-spec) since the return values from said methods are generally `bool`s.. - this means we do indeed need special handling of "runtime RPC method invocations" since ideally a user's msg-spec choices do not break core functionality on them XD => The obvi solution is to add a/some special sub-`Msg` types for such cases, possibly just a `RuntimeReturn(Return)` type that will always include a `.pld: bool` for these cancel methods such that their results are always handled without msg type errors. More to come on a (hopefully) elegant solution to that last bit!runtime_to_msgspec
parent
5b551dd9fa
commit
10c98946bd
|
@ -5,6 +5,7 @@ Low-level functional audits for our
|
|||
B~)
|
||||
|
||||
'''
|
||||
import typing
|
||||
from typing import (
|
||||
Any,
|
||||
Type,
|
||||
|
@ -23,7 +24,9 @@ from msgspec import (
|
|||
ValidationError,
|
||||
)
|
||||
import pytest
|
||||
|
||||
import tractor
|
||||
from tractor import _state
|
||||
from tractor.msg import (
|
||||
_codec,
|
||||
_ctxvar_MsgCodec,
|
||||
|
@ -34,12 +37,9 @@ from tractor.msg import (
|
|||
apply_codec,
|
||||
current_codec,
|
||||
)
|
||||
from tractor.msg import (
|
||||
types,
|
||||
)
|
||||
from tractor import _state
|
||||
from tractor.msg.types import (
|
||||
# PayloadT,
|
||||
_payload_msgs,
|
||||
log,
|
||||
Msg,
|
||||
Started,
|
||||
mk_msg_spec,
|
||||
|
@ -62,17 +62,14 @@ def test_msg_spec_xor_pld_spec():
|
|||
)
|
||||
|
||||
|
||||
def ex_func(*args):
|
||||
print(f'ex_func({args})')
|
||||
|
||||
|
||||
def mk_custom_codec(
|
||||
pld_spec: Union[Type]|Any,
|
||||
add_hooks: bool,
|
||||
|
||||
) -> MsgCodec:
|
||||
'''
|
||||
Create custom `msgpack` enc/dec-hooks and set a `Decoder`
|
||||
which only loads `NamespacePath` types.
|
||||
which only loads `pld_spec` (like `NamespacePath`) types.
|
||||
|
||||
'''
|
||||
uid: tuple[str, str] = tractor.current_actor().uid
|
||||
|
@ -83,61 +80,75 @@ def mk_custom_codec(
|
|||
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
||||
|
||||
def enc_nsp(obj: Any) -> Any:
|
||||
print(f'{uid} ENC HOOK')
|
||||
match obj:
|
||||
case NamespacePath():
|
||||
print(
|
||||
f'{uid}: `NamespacePath`-Only ENCODE?\n'
|
||||
f'type: {type(obj)}\n'
|
||||
f'obj: {obj}\n'
|
||||
f'obj-> `{obj}`: {type(obj)}\n'
|
||||
)
|
||||
|
||||
# if type(obj) != NamespacePath:
|
||||
# breakpoint()
|
||||
return str(obj)
|
||||
|
||||
logmsg: str = (
|
||||
f'{uid}: Encoding `{obj}: <{type(obj)}>` not supported'
|
||||
f'type: {type(obj)}\n'
|
||||
f'obj: {obj}\n'
|
||||
print(
|
||||
f'{uid}\n'
|
||||
'CUSTOM ENCODE\n'
|
||||
f'obj-arg-> `{obj}`: {type(obj)}\n'
|
||||
)
|
||||
logmsg: str = (
|
||||
f'{uid}\n'
|
||||
'FAILED ENCODE\n'
|
||||
f'obj-> `{obj}: {type(obj)}`\n'
|
||||
)
|
||||
print(logmsg)
|
||||
raise NotImplementedError(logmsg)
|
||||
|
||||
def dec_nsp(
|
||||
type: Type,
|
||||
obj_type: Type,
|
||||
obj: Any,
|
||||
|
||||
) -> Any:
|
||||
print(
|
||||
f'{uid}: CUSTOM DECODE\n'
|
||||
f'input type: {type}\n'
|
||||
f'obj: {obj}\n'
|
||||
f'type(obj): `{type(obj).__class__}`\n'
|
||||
f'{uid}\n'
|
||||
'CUSTOM DECODE\n'
|
||||
f'type-arg-> {obj_type}\n'
|
||||
f'obj-arg-> `{obj}`: {type(obj)}\n'
|
||||
)
|
||||
nsp = None
|
||||
|
||||
# This never seems to hit?
|
||||
if isinstance(obj, Msg):
|
||||
print(f'Msg type: {obj}')
|
||||
|
||||
if (
|
||||
type is NamespacePath
|
||||
obj_type is NamespacePath
|
||||
and isinstance(obj, str)
|
||||
and ':' in obj
|
||||
):
|
||||
nsp = NamespacePath(obj)
|
||||
# TODO: we could built a generic handler using
|
||||
# JUST matching the obj_type part?
|
||||
# nsp = obj_type(obj)
|
||||
|
||||
if nsp:
|
||||
print(f'Returning NSP instance: {nsp}')
|
||||
return nsp
|
||||
|
||||
logmsg: str = (
|
||||
f'{uid}: Decoding `{obj}: <{type(obj)}>` not supported'
|
||||
f'input type: {type(obj)}\n'
|
||||
f'obj: {obj}\n'
|
||||
f'type(obj): `{type(obj).__class__}`\n'
|
||||
f'{uid}\n'
|
||||
'FAILED DECODE\n'
|
||||
f'type-> {obj_type}\n'
|
||||
f'obj-arg-> `{obj}`: {type(obj)}\n'
|
||||
)
|
||||
print(logmsg)
|
||||
raise NotImplementedError(logmsg)
|
||||
|
||||
# TODO: figure out the ignore subsys for this!
|
||||
# -[ ] option whether to defense-relay backc the msg
|
||||
# inside an `Invalid`/`Ignore`
|
||||
# -[ ] how to make this handling pluggable such that a
|
||||
# `Channel`/`MsgTransport` can intercept and process
|
||||
# back msgs either via exception handling or some other
|
||||
# signal?
|
||||
log.warning(logmsg)
|
||||
# NOTE: this delivers the invalid
|
||||
# value up to `msgspec`'s decoding
|
||||
# machinery for error raising.
|
||||
return obj
|
||||
# raise NotImplementedError(logmsg)
|
||||
|
||||
nsp_codec: MsgCodec = mk_codec(
|
||||
ipc_pld_spec=pld_spec,
|
||||
|
@ -151,97 +162,32 @@ def mk_custom_codec(
|
|||
# `Any`-decoded-pld the enc has no knowledge (by default)
|
||||
# how to enc `NamespacePath` (nsp), so we add a custom
|
||||
# hook to do that ALWAYS.
|
||||
enc_hook=enc_nsp,
|
||||
enc_hook=enc_nsp if add_hooks else None,
|
||||
|
||||
# XXX NOTE: pretty sure this is mutex with the `type=` to
|
||||
# `Decoder`? so it won't work in tandem with the
|
||||
# `ipc_pld_spec` passed above?
|
||||
dec_hook=dec_nsp,
|
||||
dec_hook=dec_nsp if add_hooks else None,
|
||||
)
|
||||
return nsp_codec
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def send_back_nsp(
|
||||
ctx: Context,
|
||||
expect_debug: bool,
|
||||
use_any_spec: bool,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Setup up a custom codec to load instances of `NamespacePath`
|
||||
and ensure we can round trip a func ref with our parent.
|
||||
|
||||
'''
|
||||
# debug mode sanity check
|
||||
assert expect_debug == _state.debug_mode()
|
||||
|
||||
# task: trio.Task = trio.lowlevel.current_task()
|
||||
|
||||
# TreeVar
|
||||
# curr_codec = _ctxvar_MsgCodec.get_in(task)
|
||||
|
||||
# ContextVar
|
||||
# task_ctx: Context = task.context
|
||||
# assert _ctxvar_MsgCodec not in task_ctx
|
||||
|
||||
curr_codec = _ctxvar_MsgCodec.get()
|
||||
assert curr_codec is _codec._def_tractor_codec
|
||||
|
||||
if use_any_spec:
|
||||
pld_spec = Any
|
||||
else:
|
||||
# NOTE: don't need the |None here since
|
||||
# the parent side will never send `None` like
|
||||
# we do here in the implicit return at the end of this
|
||||
# `@context` body.
|
||||
pld_spec = NamespacePath # |None
|
||||
|
||||
nsp_codec: MsgCodec = mk_custom_codec(
|
||||
pld_spec=pld_spec,
|
||||
)
|
||||
with apply_codec(nsp_codec) as codec:
|
||||
chk_codec_applied(
|
||||
custom_codec=nsp_codec,
|
||||
enter_value=codec,
|
||||
)
|
||||
|
||||
# ensure roundtripping works locally
|
||||
nsp = NamespacePath.from_ref(ex_func)
|
||||
wire_bytes: bytes = nsp_codec.encode(
|
||||
Started(
|
||||
cid=ctx.cid,
|
||||
pld=nsp
|
||||
)
|
||||
)
|
||||
msg: Started = nsp_codec.decode(wire_bytes)
|
||||
pld = msg.pld
|
||||
assert pld == nsp
|
||||
|
||||
await ctx.started(nsp)
|
||||
async with ctx.open_stream() as ipc:
|
||||
async for msg in ipc:
|
||||
|
||||
if use_any_spec:
|
||||
assert msg == f'{__name__}:ex_func'
|
||||
|
||||
# TODO: as per below
|
||||
# assert isinstance(msg, NamespacePath)
|
||||
assert isinstance(msg, str)
|
||||
else:
|
||||
assert isinstance(msg, NamespacePath)
|
||||
|
||||
await ipc.send(msg)
|
||||
|
||||
|
||||
def chk_codec_applied(
|
||||
custom_codec: MsgCodec,
|
||||
enter_value: MsgCodec,
|
||||
expect_codec: MsgCodec,
|
||||
enter_value: MsgCodec|None = None,
|
||||
|
||||
) -> MsgCodec:
|
||||
'''
|
||||
buncha sanity checks ensuring that the IPC channel's
|
||||
context-vars are set to the expected codec and that are
|
||||
ctx-var wrapper APIs match the same.
|
||||
|
||||
# task: trio.Task = trio.lowlevel.current_task()
|
||||
|
||||
'''
|
||||
# TODO: play with tricyle again, bc this is supposed to work
|
||||
# the way we want?
|
||||
#
|
||||
# TreeVar
|
||||
# task: trio.Task = trio.lowlevel.current_task()
|
||||
# curr_codec = _ctxvar_MsgCodec.get_in(task)
|
||||
|
||||
# ContextVar
|
||||
|
@ -249,46 +195,358 @@ def chk_codec_applied(
|
|||
# assert _ctxvar_MsgCodec in task_ctx
|
||||
# curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec]
|
||||
|
||||
# NOTE: currently we use this!
|
||||
# RunVar
|
||||
curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||
curr_codec: MsgCodec = current_codec()
|
||||
last_read_codec = _ctxvar_MsgCodec.get()
|
||||
assert curr_codec is last_read_codec
|
||||
# assert curr_codec is last_read_codec
|
||||
|
||||
assert (
|
||||
(same_codec := expect_codec) is
|
||||
# returned from `mk_codec()`
|
||||
custom_codec is
|
||||
|
||||
# yielded value from `apply_codec()`
|
||||
enter_value is
|
||||
|
||||
# read from current task's `contextvars.Context`
|
||||
curr_codec is
|
||||
|
||||
# public API for all of the above
|
||||
current_codec()
|
||||
last_read_codec
|
||||
|
||||
# the default `msgspec` settings
|
||||
is not _codec._def_msgspec_codec
|
||||
is not _codec._def_tractor_codec
|
||||
)
|
||||
|
||||
if enter_value:
|
||||
enter_value is same_codec
|
||||
|
||||
|
||||
def iter_maybe_sends(
|
||||
send_items: dict[Union[Type], Any] | list[tuple],
|
||||
ipc_pld_spec: Union[Type] | Any,
|
||||
add_codec_hooks: bool,
|
||||
|
||||
codec: MsgCodec|None = None,
|
||||
|
||||
) -> tuple[Any, bool]:
|
||||
|
||||
if isinstance(send_items, dict):
|
||||
send_items = send_items.items()
|
||||
|
||||
for (
|
||||
send_type_spec,
|
||||
send_value,
|
||||
) in send_items:
|
||||
|
||||
expect_roundtrip: bool = False
|
||||
|
||||
# values-to-typespec santiy
|
||||
send_type = type(send_value)
|
||||
assert send_type == send_type_spec or (
|
||||
(subtypes := getattr(send_type_spec, '__args__', None))
|
||||
and send_type in subtypes
|
||||
)
|
||||
|
||||
spec_subtypes: set[Union[Type]] = (
|
||||
getattr(
|
||||
ipc_pld_spec,
|
||||
'__args__',
|
||||
{ipc_pld_spec,},
|
||||
)
|
||||
)
|
||||
send_in_spec: bool = (
|
||||
send_type == ipc_pld_spec
|
||||
or (
|
||||
ipc_pld_spec != Any
|
||||
and # presume `Union` of types
|
||||
send_type in spec_subtypes
|
||||
)
|
||||
or (
|
||||
ipc_pld_spec == Any
|
||||
and
|
||||
send_type != NamespacePath
|
||||
)
|
||||
)
|
||||
expect_roundtrip = (
|
||||
send_in_spec
|
||||
# any spec should support all other
|
||||
# builtin py values that we send
|
||||
# except our custom nsp type which
|
||||
# we should be able to send as long
|
||||
# as we provide the custom codec hooks.
|
||||
or (
|
||||
ipc_pld_spec == Any
|
||||
and
|
||||
send_type == NamespacePath
|
||||
and
|
||||
add_codec_hooks
|
||||
)
|
||||
)
|
||||
|
||||
if codec is not None:
|
||||
# XXX FIRST XXX ensure roundtripping works
|
||||
# before touching any IPC primitives/APIs.
|
||||
wire_bytes: bytes = codec.encode(
|
||||
Started(
|
||||
cid='blahblah',
|
||||
pld=send_value,
|
||||
)
|
||||
)
|
||||
# NOTE: demonstrates the decoder loading
|
||||
# to via our native SCIPP msg-spec
|
||||
# (structurred-conc-inter-proc-protocol)
|
||||
# implemented as per,
|
||||
try:
|
||||
msg: Started = codec.decode(wire_bytes)
|
||||
if not expect_roundtrip:
|
||||
pytest.fail(
|
||||
f'NOT-EXPECTED able to roundtrip value given spec:\n'
|
||||
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
||||
f'value -> {send_value}: {send_type}\n'
|
||||
)
|
||||
|
||||
pld = msg.pld
|
||||
assert pld == send_value
|
||||
|
||||
except ValidationError:
|
||||
if expect_roundtrip:
|
||||
pytest.fail(
|
||||
f'EXPECTED to roundtrip value given spec:\n'
|
||||
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
||||
f'value -> {send_value}: {send_type}\n'
|
||||
)
|
||||
|
||||
yield (
|
||||
str(send_type),
|
||||
send_value,
|
||||
expect_roundtrip,
|
||||
)
|
||||
|
||||
|
||||
def dec_type_union(
|
||||
type_names: list[str],
|
||||
) -> Type:
|
||||
'''
|
||||
Look up types by name, compile into a list and then create and
|
||||
return a `typing.Union` from the full set.
|
||||
|
||||
'''
|
||||
import importlib
|
||||
types: list[Type] = []
|
||||
for type_name in type_names:
|
||||
for ns in [
|
||||
typing,
|
||||
importlib.import_module(__name__),
|
||||
]:
|
||||
if type_ref := getattr(
|
||||
ns,
|
||||
type_name,
|
||||
False,
|
||||
):
|
||||
types.append(type_ref)
|
||||
|
||||
# special case handling only..
|
||||
# ipc_pld_spec: Union[Type] = eval(
|
||||
# pld_spec_str,
|
||||
# {}, # globals
|
||||
# {'typing': typing}, # locals
|
||||
# )
|
||||
|
||||
return Union[*types]
|
||||
|
||||
|
||||
def enc_type_union(
|
||||
union_or_type: Union[Type]|Type,
|
||||
) -> list[str]:
|
||||
'''
|
||||
Encode a type-union or single type to a list of type-name-strings
|
||||
ready for IPC interchange.
|
||||
|
||||
'''
|
||||
type_strs: list[str] = []
|
||||
for typ in getattr(
|
||||
union_or_type,
|
||||
'__args__',
|
||||
{union_or_type,},
|
||||
):
|
||||
type_strs.append(typ.__qualname__)
|
||||
|
||||
return type_strs
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def send_back_nsp(
|
||||
ctx: Context,
|
||||
expect_debug: bool,
|
||||
pld_spec_type_strs: list[str],
|
||||
add_hooks: bool,
|
||||
started_msg_bytes: bytes,
|
||||
expect_ipc_send: dict[str, tuple[Any, bool]],
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Setup up a custom codec to load instances of `NamespacePath`
|
||||
and ensure we can round trip a func ref with our parent.
|
||||
|
||||
'''
|
||||
# debug mode sanity check (prolly superfluous but, meh)
|
||||
assert expect_debug == _state.debug_mode()
|
||||
|
||||
# init state in sub-actor should be default
|
||||
chk_codec_applied(
|
||||
expect_codec=_codec._def_tractor_codec,
|
||||
)
|
||||
|
||||
# load pld spec from input str
|
||||
ipc_pld_spec = dec_type_union(
|
||||
pld_spec_type_strs,
|
||||
)
|
||||
pld_spec_str = str(ipc_pld_spec)
|
||||
|
||||
# same as on parent side config.
|
||||
nsp_codec: MsgCodec = mk_custom_codec(
|
||||
pld_spec=ipc_pld_spec,
|
||||
add_hooks=add_hooks,
|
||||
)
|
||||
with apply_codec(nsp_codec) as codec:
|
||||
chk_codec_applied(
|
||||
expect_codec=nsp_codec,
|
||||
enter_value=codec,
|
||||
)
|
||||
|
||||
print(
|
||||
'CHILD attempting `Started`-bytes DECODE..\n'
|
||||
)
|
||||
try:
|
||||
msg: Started = nsp_codec.decode(started_msg_bytes)
|
||||
expected_pld_spec_str: str = msg.pld
|
||||
assert pld_spec_str == expected_pld_spec_str
|
||||
|
||||
# TODO: maybe we should add our own wrapper error so as to
|
||||
# be interchange-lib agnostic?
|
||||
# -[ ] the error type is wtv is raised from the hook so we
|
||||
# could also require a type-class of errors for
|
||||
# indicating whether the hook-failure can be handled by
|
||||
# a nasty-dialog-unprot sub-sys?
|
||||
except ValidationError:
|
||||
|
||||
# NOTE: only in the `Any` spec case do we expect this to
|
||||
# work since otherwise no spec covers a plain-ol'
|
||||
# `.pld: str`
|
||||
if pld_spec_str == 'Any':
|
||||
raise
|
||||
else:
|
||||
print(
|
||||
'CHILD (correctly) unable to DECODE `Started`-bytes\n'
|
||||
f'{started_msg_bytes}\n'
|
||||
)
|
||||
|
||||
iter_send_val_items = iter(expect_ipc_send.values())
|
||||
sent: list[Any] = []
|
||||
for send_value, expect_send in iter_send_val_items:
|
||||
try:
|
||||
print(
|
||||
f'CHILD attempting to `.started({send_value})`\n'
|
||||
f'=> expect_send: {expect_send}\n'
|
||||
f'SINCE, ipc_pld_spec: {ipc_pld_spec}\n'
|
||||
f'AND, codec: {codec}\n'
|
||||
)
|
||||
await ctx.started(send_value)
|
||||
sent.append(send_value)
|
||||
if not expect_send:
|
||||
|
||||
# XXX NOTE XXX THIS WON'T WORK WITHOUT SPECIAL
|
||||
# `str` handling! or special debug mode IPC
|
||||
# msgs!
|
||||
# await tractor.pause()
|
||||
|
||||
raise RuntimeError(
|
||||
# pytest.fail(
|
||||
f'NOT-EXPECTED able to roundtrip value given spec:\n'
|
||||
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
||||
f'value -> {send_value}: {type(send_value)}\n'
|
||||
)
|
||||
|
||||
break # move on to streaming block..
|
||||
|
||||
except NotImplementedError:
|
||||
print('FAILED ENCODE!')
|
||||
|
||||
except tractor.MsgTypeError:
|
||||
# await tractor.pause()
|
||||
if expect_send:
|
||||
pytest.fail(
|
||||
f'EXPECTED to `.started()` value given spec:\n'
|
||||
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
||||
f'value -> {send_value}: {type(send_value)}\n'
|
||||
)
|
||||
|
||||
async with ctx.open_stream() as ipc:
|
||||
for send_value, expect_send in iter_send_val_items:
|
||||
send_type: Type = type(send_value)
|
||||
print(
|
||||
'CHILD report on send value\n'
|
||||
f'ipc_pld_spec: {ipc_pld_spec}\n'
|
||||
f'expect_send: {expect_send}\n'
|
||||
f'val: {send_value}\n'
|
||||
)
|
||||
try:
|
||||
await ipc.send(send_value)
|
||||
sent.append(send_value)
|
||||
if not expect_send:
|
||||
pytest.fail(
|
||||
f'NOT-EXPECTED able to roundtrip value given spec:\n'
|
||||
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
||||
f'value -> {send_value}: {send_type}\n'
|
||||
)
|
||||
except ValidationError:
|
||||
if expect_send:
|
||||
pytest.fail(
|
||||
f'EXPECTED to roundtrip value given spec:\n'
|
||||
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
||||
f'value -> {send_value}: {send_type}\n'
|
||||
)
|
||||
continue
|
||||
|
||||
assert (
|
||||
len(sent)
|
||||
==
|
||||
len([val
|
||||
for val, expect in
|
||||
expect_ipc_send.values()
|
||||
if expect is True])
|
||||
)
|
||||
|
||||
|
||||
def ex_func(*args):
|
||||
print(f'ex_func({args})')
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'ipc_pld_spec',
|
||||
[
|
||||
# _codec._def_msgspec_codec,
|
||||
Any,
|
||||
# _codec._def_tractor_codec,
|
||||
NamespacePath|None,
|
||||
NamespacePath,
|
||||
NamespacePath|None, # the "maybe" spec Bo
|
||||
],
|
||||
ids=[
|
||||
'any_type',
|
||||
'nsp_type',
|
||||
'maybe_nsp_type',
|
||||
]
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
'add_codec_hooks',
|
||||
[
|
||||
True,
|
||||
False,
|
||||
],
|
||||
ids=['use_codec_hooks', 'no_codec_hooks'],
|
||||
)
|
||||
def test_codec_hooks_mod(
|
||||
debug_mode: bool,
|
||||
ipc_pld_spec: Union[Type]|Any,
|
||||
# send_value: None|str|NamespacePath,
|
||||
add_codec_hooks: bool,
|
||||
):
|
||||
'''
|
||||
Audit the `.msg.MsgCodec` override apis details given our impl
|
||||
|
@ -297,17 +555,17 @@ def test_codec_hooks_mod(
|
|||
|
||||
'''
|
||||
async def main():
|
||||
nsp = NamespacePath.from_ref(ex_func)
|
||||
send_items: dict[Union, Any] = {
|
||||
Union[None]: None,
|
||||
Union[NamespacePath]: nsp,
|
||||
Union[str]: str(nsp),
|
||||
}
|
||||
|
||||
# task: trio.Task = trio.lowlevel.current_task()
|
||||
|
||||
# ContextVar
|
||||
# task_ctx: Context = task.context
|
||||
# assert _ctxvar_MsgCodec not in task_ctx
|
||||
|
||||
# TreeVar
|
||||
# def_codec: MsgCodec = _ctxvar_MsgCodec.get_in(task)
|
||||
def_codec = _ctxvar_MsgCodec.get()
|
||||
assert def_codec is _codec._def_tractor_codec
|
||||
# init default state for actor
|
||||
chk_codec_applied(
|
||||
expect_codec=_codec._def_tractor_codec,
|
||||
)
|
||||
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
|
@ -323,79 +581,97 @@ def test_codec_hooks_mod(
|
|||
# `NamespacePath`
|
||||
nsp_codec: MsgCodec = mk_custom_codec(
|
||||
pld_spec=ipc_pld_spec,
|
||||
add_hooks=add_codec_hooks,
|
||||
)
|
||||
with apply_codec(nsp_codec) as codec:
|
||||
chk_codec_applied(
|
||||
custom_codec=nsp_codec,
|
||||
expect_codec=nsp_codec,
|
||||
enter_value=codec,
|
||||
)
|
||||
|
||||
expect_ipc_send: dict[str, tuple[Any, bool]] = {}
|
||||
|
||||
report: str = (
|
||||
'Parent report on send values with\n'
|
||||
f'ipc_pld_spec: {ipc_pld_spec}\n'
|
||||
' ------ - ------\n'
|
||||
)
|
||||
for val_type_str, val, expect_send in iter_maybe_sends(
|
||||
send_items,
|
||||
ipc_pld_spec,
|
||||
add_codec_hooks=add_codec_hooks,
|
||||
):
|
||||
report += (
|
||||
f'send_value: {val}: {type(val)} '
|
||||
f'=> expect_send: {expect_send}\n'
|
||||
)
|
||||
expect_ipc_send[val_type_str] = (val, expect_send)
|
||||
|
||||
print(
|
||||
report +
|
||||
' ------ - ------\n'
|
||||
)
|
||||
assert len(expect_ipc_send) == len(send_items)
|
||||
# now try over real IPC with a the subactor
|
||||
# expect_ipc_rountrip: bool = True
|
||||
expected_started = Started(
|
||||
cid='cid',
|
||||
pld=str(ipc_pld_spec),
|
||||
)
|
||||
# build list of values we expect to receive from
|
||||
# the subactor.
|
||||
expect_to_send: list[Any] = [
|
||||
val
|
||||
for val, expect_send in expect_ipc_send.values()
|
||||
if expect_send
|
||||
]
|
||||
|
||||
pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec)
|
||||
|
||||
# TODO: send the original nsp here and
|
||||
# test with `limit_msg_spec()` above?
|
||||
# await tractor.pause()
|
||||
print('PARENT opening IPC ctx!\n')
|
||||
async with (
|
||||
|
||||
p.open_context(
|
||||
send_back_nsp,
|
||||
# TODO: send the original nsp here and
|
||||
# test with `limit_msg_spec()` above?
|
||||
expect_debug=debug_mode,
|
||||
use_any_spec=(ipc_pld_spec==Any),
|
||||
|
||||
pld_spec_type_strs=pld_spec_type_strs,
|
||||
add_hooks=add_codec_hooks,
|
||||
started_msg_bytes=nsp_codec.encode(expected_started),
|
||||
expect_ipc_send=expect_ipc_send,
|
||||
) as (ctx, first),
|
||||
|
||||
ctx.open_stream() as ipc,
|
||||
):
|
||||
if ipc_pld_spec is NamespacePath:
|
||||
assert isinstance(first, NamespacePath)
|
||||
|
||||
# ensure codec is still applied across
|
||||
# `tractor.Context` + its embedded nursery.
|
||||
chk_codec_applied(
|
||||
expect_codec=nsp_codec,
|
||||
enter_value=codec,
|
||||
)
|
||||
print(
|
||||
'root: ENTERING CONTEXT BLOCK\n'
|
||||
f'type(first): {type(first)}\n'
|
||||
f'first: {first}\n'
|
||||
)
|
||||
# ensure codec is still applied across
|
||||
# `tractor.Context` + its embedded nursery.
|
||||
chk_codec_applied(
|
||||
custom_codec=nsp_codec,
|
||||
enter_value=codec,
|
||||
)
|
||||
expect_to_send.remove(first)
|
||||
|
||||
first_nsp = NamespacePath(first)
|
||||
# TODO: explicit values we expect depending on
|
||||
# codec config!
|
||||
# assert first == first_val
|
||||
# assert first == f'{__name__}:ex_func'
|
||||
|
||||
# ensure roundtripping works
|
||||
wire_bytes: bytes = nsp_codec.encode(
|
||||
Started(
|
||||
cid=ctx.cid,
|
||||
pld=first_nsp
|
||||
async for next_sent in ipc:
|
||||
print(
|
||||
'Child sent next value\n'
|
||||
f'{next_sent}: {type(next_sent)}\n'
|
||||
)
|
||||
)
|
||||
msg: Started = nsp_codec.decode(wire_bytes)
|
||||
pld = msg.pld
|
||||
assert pld == first_nsp
|
||||
expect_to_send.remove(next_sent)
|
||||
|
||||
# try a manual decode of the started msg+pld
|
||||
|
||||
# TODO: actually get the decoder loading
|
||||
# to native once we spec our SCIPP msgspec
|
||||
# (structurred-conc-inter-proc-protocol)
|
||||
# implemented as per,
|
||||
# https://github.com/goodboy/tractor/issues/36
|
||||
#
|
||||
if ipc_pld_spec is NamespacePath:
|
||||
assert isinstance(first, NamespacePath)
|
||||
|
||||
# `Any`-payload-spec case
|
||||
else:
|
||||
assert isinstance(first, str)
|
||||
assert first == f'{__name__}:ex_func'
|
||||
|
||||
await ipc.send(first)
|
||||
|
||||
with trio.move_on_after(.6):
|
||||
async for msg in ipc:
|
||||
print(msg)
|
||||
|
||||
# TODO: as per above
|
||||
# assert isinstance(msg, NamespacePath)
|
||||
assert isinstance(msg, str)
|
||||
await ipc.send(msg)
|
||||
await trio.sleep(0.1)
|
||||
# all sent values should have arrived!
|
||||
assert not expect_to_send
|
||||
|
||||
await p.cancel_actor()
|
||||
|
||||
|
@ -467,7 +743,7 @@ def chk_pld_type(
|
|||
|
||||
roundtrip: bool|None = None
|
||||
pld_spec_msg_names: list[str] = [
|
||||
td.__name__ for td in types._payload_spec_msgs
|
||||
td.__name__ for td in _payload_msgs
|
||||
]
|
||||
for typedef in msg_types:
|
||||
|
||||
|
|
Loading…
Reference in New Issue