Compare commits

..

No commits in common. "8077487efc2bbd5f43724be27c6ef2cbe400e649" and "81349f7c09acf9098b560f44e32cb27d91e41242" have entirely different histories.

6 changed files with 229 additions and 613 deletions

View File

@ -5,6 +5,7 @@ Low-level functional audits for our
B~) B~)
''' '''
import typing
from typing import ( from typing import (
Any, Any,
Type, Type,
@ -14,16 +15,13 @@ from typing import (
from msgspec import ( from msgspec import (
structs, structs,
msgpack, msgpack,
Raw,
Struct, Struct,
ValidationError, ValidationError,
) )
import pytest import pytest
import trio
import tractor import tractor
from tractor import ( from tractor import (
Actor,
_state, _state,
MsgTypeError, MsgTypeError,
Context, Context,
@ -31,13 +29,10 @@ from tractor import (
from tractor.msg import ( from tractor.msg import (
_codec, _codec,
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
_exts,
NamespacePath, NamespacePath,
MsgCodec, MsgCodec,
MsgDec,
mk_codec, mk_codec,
mk_dec,
apply_codec, apply_codec,
current_codec, current_codec,
) )
@ -48,34 +43,101 @@ from tractor.msg.types import (
Started, Started,
mk_msg_spec, mk_msg_spec,
) )
from tractor.msg._ops import ( import trio
limit_plds,
)
def mk_custom_codec( def mk_custom_codec(
pld_spec: Union[Type]|Any,
add_hooks: bool, add_hooks: bool,
) -> tuple[ ) -> MsgCodec:
MsgCodec, # encode to send
MsgDec, # pld receive-n-decode
]:
''' '''
Create custom `msgpack` enc/dec-hooks and set a `Decoder` Create custom `msgpack` enc/dec-hooks and set a `Decoder`
which only loads `pld_spec` (like `NamespacePath`) types. which only loads `pld_spec` (like `NamespacePath`) types.
''' '''
uid: tuple[str, str] = tractor.current_actor().uid
# XXX NOTE XXX: despite defining `NamespacePath` as a type # XXX NOTE XXX: despite defining `NamespacePath` as a type
# field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair
# to cast to/from that type on the wire. See the docs: # to cast to/from that type on the wire. See the docs:
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
# if pld_spec is Any: def enc_nsp(obj: Any) -> Any:
# pld_spec = Raw print(f'{uid} ENC HOOK')
match obj:
case NamespacePath():
print(
f'{uid}: `NamespacePath`-Only ENCODE?\n'
f'obj-> `{obj}`: {type(obj)}\n'
)
# if type(obj) != NamespacePath:
# breakpoint()
return str(obj)
print(
f'{uid}\n'
'CUSTOM ENCODE\n'
f'obj-arg-> `{obj}`: {type(obj)}\n'
)
logmsg: str = (
f'{uid}\n'
'FAILED ENCODE\n'
f'obj-> `{obj}: {type(obj)}`\n'
)
raise NotImplementedError(logmsg)
def dec_nsp(
obj_type: Type,
obj: Any,
) -> Any:
print(
f'{uid}\n'
'CUSTOM DECODE\n'
f'type-arg-> {obj_type}\n'
f'obj-arg-> `{obj}`: {type(obj)}\n'
)
nsp = None
if (
obj_type is NamespacePath
and isinstance(obj, str)
and ':' in obj
):
nsp = NamespacePath(obj)
# TODO: we could built a generic handler using
# JUST matching the obj_type part?
# nsp = obj_type(obj)
if nsp:
print(f'Returning NSP instance: {nsp}')
return nsp
logmsg: str = (
f'{uid}\n'
'FAILED DECODE\n'
f'type-> {obj_type}\n'
f'obj-arg-> `{obj}`: {type(obj)}\n\n'
f'current codec:\n'
f'{current_codec()}\n'
)
# TODO: figure out the ignore subsys for this!
# -[ ] option whether to defense-relay backc the msg
# inside an `Invalid`/`Ignore`
# -[ ] how to make this handling pluggable such that a
# `Channel`/`MsgTransport` can intercept and process
# back msgs either via exception handling or some other
# signal?
log.warning(logmsg)
# NOTE: this delivers the invalid
# value up to `msgspec`'s decoding
# machinery for error raising.
return obj
# raise NotImplementedError(logmsg)
nsp_codec: MsgCodec = mk_codec( nsp_codec: MsgCodec = mk_codec(
# ipc_pld_spec=Raw, # default! ipc_pld_spec=pld_spec,
# NOTE XXX: the encode hook MUST be used no matter what since # NOTE XXX: the encode hook MUST be used no matter what since
# our `NamespacePath` is not any of a `Any` native type nor # our `NamespacePath` is not any of a `Any` native type nor
@ -91,9 +153,8 @@ def mk_custom_codec(
# XXX NOTE: pretty sure this is mutex with the `type=` to # XXX NOTE: pretty sure this is mutex with the `type=` to
# `Decoder`? so it won't work in tandem with the # `Decoder`? so it won't work in tandem with the
# `ipc_pld_spec` passed above? # `ipc_pld_spec` passed above?
ext_types=[NamespacePath], dec_hook=dec_nsp if add_hooks else None,
) )
# dec_hook=dec_nsp if add_hooks else None,
return nsp_codec return nsp_codec
@ -247,13 +308,64 @@ def iter_maybe_sends(
) )
def dec_type_union(
type_names: list[str],
) -> Type:
'''
Look up types by name, compile into a list and then create and
return a `typing.Union` from the full set.
'''
import importlib
types: list[Type] = []
for type_name in type_names:
for mod in [
typing,
importlib.import_module(__name__),
]:
if type_ref := getattr(
mod,
type_name,
False,
):
types.append(type_ref)
# special case handling only..
# ipc_pld_spec: Union[Type] = eval(
# pld_spec_str,
# {}, # globals
# {'typing': typing}, # locals
# )
return Union[*types]
def enc_type_union(
union_or_type: Union[Type]|Type,
) -> list[str]:
'''
Encode a type-union or single type to a list of type-name-strings
ready for IPC interchange.
'''
type_strs: list[str] = []
for typ in getattr(
union_or_type,
'__args__',
{union_or_type,},
):
type_strs.append(typ.__qualname__)
return type_strs
@tractor.context @tractor.context
async def send_back_values( async def send_back_values(
ctx: Context, ctx: Context,
expect_debug: bool, expect_debug: bool,
pld_spec_type_strs: list[str], pld_spec_type_strs: list[str],
add_hooks: bool, add_hooks: bool,
# started_msg_bytes: bytes, started_msg_bytes: bytes,
expect_ipc_send: dict[str, tuple[Any, bool]], expect_ipc_send: dict[str, tuple[Any, bool]],
) -> None: ) -> None:
@ -273,43 +385,31 @@ async def send_back_values(
) )
# load pld spec from input str # load pld spec from input str
ipc_pld_spec = _exts.dec_type_union( ipc_pld_spec = dec_type_union(
pld_spec_type_strs, pld_spec_type_strs,
) )
pld_spec_str = str(ipc_pld_spec) pld_spec_str = str(ipc_pld_spec)
# same as on parent side config. # same as on parent side config.
nsp_codec: MsgCodec = mk_custom_codec( nsp_codec: MsgCodec = mk_custom_codec(
pld_spec=ipc_pld_spec,
add_hooks=add_hooks, add_hooks=add_hooks,
) )
with ( with (
apply_codec(nsp_codec) as codec, apply_codec(nsp_codec) as codec,
limit_plds(ipc_pld_spec) as codec,
): ):
# we SHOULD NOT be swapping the global codec since it breaks
# `Context.starte()` roundtripping checks!
chk_codec_applied( chk_codec_applied(
expect_codec=nsp_codec, expect_codec=nsp_codec,
enter_value=codec,
) )
# XXX SO NOT THIS!
# chk_codec_applied(
# expect_codec=nsp_codec,
# enter_value=codec,
# )
print( print(
f'{uid}: attempting `Started`-bytes DECODE..\n' f'{uid}: attempting `Started`-bytes DECODE..\n'
) )
try: try:
# msg: Started = nsp_codec.decode(started_msg_bytes) msg: Started = nsp_codec.decode(started_msg_bytes)
expected_pld_spec_str: str = msg.pld
ipc_spec: Type = ctx._pld_rx._pld_dec.spec assert pld_spec_str == expected_pld_spec_str
expected_pld_spec_str: str = str(ipc_spec)
assert (
pld_spec_str == expected_pld_spec_str
and
ipc_pld_spec == ipc_spec
)
# TODO: maybe we should add our own wrapper error so as to # TODO: maybe we should add our own wrapper error so as to
# be interchange-lib agnostic? # be interchange-lib agnostic?
@ -327,15 +427,12 @@ async def send_back_values(
else: else:
print( print(
f'{uid}: (correctly) unable to DECODE `Started`-bytes\n' f'{uid}: (correctly) unable to DECODE `Started`-bytes\n'
# f'{started_msg_bytes}\n' f'{started_msg_bytes}\n'
) )
iter_send_val_items = iter(expect_ipc_send.values()) iter_send_val_items = iter(expect_ipc_send.values())
sent: list[Any] = [] sent: list[Any] = []
for ( for send_value, expect_send in iter_send_val_items:
send_value,
expect_send,
) in iter_send_val_items:
try: try:
print( print(
f'{uid}: attempting to `.started({send_value})`\n' f'{uid}: attempting to `.started({send_value})`\n'
@ -360,20 +457,16 @@ async def send_back_values(
break # move on to streaming block.. break # move on to streaming block..
except tractor.MsgTypeError as _mte: except tractor.MsgTypeError:
mte = _mte await tractor.pause()
if expect_send: if expect_send:
raise RuntimeError( raise RuntimeError(
f'EXPECTED to `.started()` value given spec ??\n\n' f'EXPECTED to `.started()` value given spec:\n'
f'ipc_pld_spec -> {ipc_pld_spec}\n' f'ipc_pld_spec -> {ipc_pld_spec}\n'
f'value -> {send_value}: {type(send_value)}\n' f'value -> {send_value}: {type(send_value)}\n'
) )
# await tractor.pause()
raise mte
async with ctx.open_stream() as ipc: async with ctx.open_stream() as ipc:
print( print(
f'{uid}: Entering streaming block to send remaining values..' f'{uid}: Entering streaming block to send remaining values..'
@ -437,6 +530,10 @@ async def send_back_values(
# ) # )
def ex_func(*args):
print(f'ex_func({args})')
@pytest.mark.parametrize( @pytest.mark.parametrize(
'ipc_pld_spec', 'ipc_pld_spec',
[ [
@ -496,6 +593,7 @@ def test_codec_hooks_mod(
# - codec modified with hooks -> decode nsp as # - codec modified with hooks -> decode nsp as
# `NamespacePath` # `NamespacePath`
nsp_codec: MsgCodec = mk_custom_codec( nsp_codec: MsgCodec = mk_custom_codec(
pld_spec=ipc_pld_spec,
add_hooks=add_codec_hooks, add_hooks=add_codec_hooks,
) )
with apply_codec(nsp_codec) as codec: with apply_codec(nsp_codec) as codec:
@ -511,11 +609,7 @@ def test_codec_hooks_mod(
f'ipc_pld_spec: {ipc_pld_spec}\n' f'ipc_pld_spec: {ipc_pld_spec}\n'
' ------ - ------\n' ' ------ - ------\n'
) )
for ( for val_type_str, val, expect_send in iter_maybe_sends(
val_type_str,
val,
expect_send,
)in iter_maybe_sends(
send_items, send_items,
ipc_pld_spec, ipc_pld_spec,
add_codec_hooks=add_codec_hooks, add_codec_hooks=add_codec_hooks,
@ -524,10 +618,7 @@ def test_codec_hooks_mod(
f'send_value: {val}: {type(val)} ' f'send_value: {val}: {type(val)} '
f'=> expect_send: {expect_send}\n' f'=> expect_send: {expect_send}\n'
) )
expect_ipc_send[val_type_str] = ( expect_ipc_send[val_type_str] = (val, expect_send)
val,
expect_send,
)
print( print(
report + report +
@ -536,25 +627,9 @@ def test_codec_hooks_mod(
assert len(expect_ipc_send) == len(send_items) assert len(expect_ipc_send) == len(send_items)
# now try over real IPC with a the subactor # now try over real IPC with a the subactor
# expect_ipc_rountrip: bool = True # expect_ipc_rountrip: bool = True
if (
subtypes := getattr(
ipc_pld_spec, '__args__', False
)
):
pld_types_str: str = '|'.join(subtypes)
# breakpoint()
else:
# TODO, use `.msg._exts` utils instead of this!
pld_types_str: str = ipc_pld_spec.__name__
expected_started = Started( expected_started = Started(
cid='cid', cid='cid',
# pld=str(pld_types_str), pld=str(ipc_pld_spec),
pld=ipc_pld_spec,
)
started_msg_bytes: bytes = nsp_codec.encode(
expected_started,
) )
# build list of values we expect to receive from # build list of values we expect to receive from
# the subactor. # the subactor.
@ -564,7 +639,7 @@ def test_codec_hooks_mod(
if expect_send if expect_send
] ]
pld_spec_type_strs: list[str] = _exts.enc_type_union(ipc_pld_spec) pld_spec_type_strs: list[str] = enc_type_union(ipc_pld_spec)
# XXX should raise an mte (`MsgTypeError`) # XXX should raise an mte (`MsgTypeError`)
# when `add_codec_hooks == False` bc the input # when `add_codec_hooks == False` bc the input
@ -580,7 +655,7 @@ def test_codec_hooks_mod(
expect_debug=debug_mode, expect_debug=debug_mode,
pld_spec_type_strs=pld_spec_type_strs, pld_spec_type_strs=pld_spec_type_strs,
add_hooks=add_codec_hooks, add_hooks=add_codec_hooks,
started_msg_bytes=started_msg_bytes, started_msg_bytes=nsp_codec.encode(expected_started),
# XXX NOTE bc we send a `NamespacePath` in this kwarg # XXX NOTE bc we send a `NamespacePath` in this kwarg
expect_ipc_send=expect_ipc_send, expect_ipc_send=expect_ipc_send,
@ -598,8 +673,6 @@ def test_codec_hooks_mod(
# test with `limit_msg_spec()` above? # test with `limit_msg_spec()` above?
# await tractor.pause() # await tractor.pause()
print('PARENT opening IPC ctx!\n') print('PARENT opening IPC ctx!\n')
ctx: tractor.Context
ipc: tractor.MsgStream
async with ( async with (
# XXX should raise an mte (`MsgTypeError`) # XXX should raise an mte (`MsgTypeError`)
@ -801,16 +874,9 @@ def chk_pld_type(
return roundtrip return roundtrip
# ?TODO? remove since covered in the newer `test_pldrx_limiting`?
def test_limit_msgspec( def test_limit_msgspec(
debug_mode: bool, debug_mode: bool,
): ):
'''
Internals unit testing to verify that type-limiting an IPC ctx's
msg spec with `Pldrx.limit_plds()` results in various
encapsulated `msgspec` object settings and state.
'''
async def main(): async def main():
async with tractor.open_root_actor( async with tractor.open_root_actor(
debug_mode=debug_mode, debug_mode=debug_mode,
@ -849,188 +915,3 @@ def test_limit_msgspec(
# breakpoint() # breakpoint()
trio.run(main) trio.run(main)
def enc_nsp(obj: Any) -> Any:
actor: Actor = tractor.current_actor(
err_on_no_runtime=False,
)
uid: tuple[str, str]|None = None if not actor else actor.uid
print(f'{uid} ENC HOOK')
match obj:
# case NamespacePath()|str():
case NamespacePath():
encoded: str = str(obj)
print(
f'----- ENCODING `NamespacePath` as `str` ------\n'
f'|_obj:{type(obj)!r} = {obj!r}\n'
f'|_encoded: str = {encoded!r}\n'
)
# if type(obj) != NamespacePath:
# breakpoint()
return encoded
case _:
logmsg: str = (
f'{uid}\n'
'FAILED ENCODE\n'
f'obj-> `{obj}: {type(obj)}`\n'
)
raise NotImplementedError(logmsg)
def dec_nsp(
obj_type: Type,
obj: Any,
) -> Any:
# breakpoint()
actor: Actor = tractor.current_actor(
err_on_no_runtime=False,
)
uid: tuple[str, str]|None = None if not actor else actor.uid
print(
f'{uid}\n'
'CUSTOM DECODE\n'
f'type-arg-> {obj_type}\n'
f'obj-arg-> `{obj}`: {type(obj)}\n'
)
nsp = None
# XXX, never happens right?
if obj_type is Raw:
breakpoint()
if (
obj_type is NamespacePath
and isinstance(obj, str)
and ':' in obj
):
nsp = NamespacePath(obj)
# TODO: we could built a generic handler using
# JUST matching the obj_type part?
# nsp = obj_type(obj)
if nsp:
print(f'Returning NSP instance: {nsp}')
return nsp
logmsg: str = (
f'{uid}\n'
'FAILED DECODE\n'
f'type-> {obj_type}\n'
f'obj-arg-> `{obj}`: {type(obj)}\n\n'
f'current codec:\n'
f'{current_codec()}\n'
)
# TODO: figure out the ignore subsys for this!
# -[ ] option whether to defense-relay backc the msg
# inside an `Invalid`/`Ignore`
# -[ ] how to make this handling pluggable such that a
# `Channel`/`MsgTransport` can intercept and process
# back msgs either via exception handling or some other
# signal?
log.warning(logmsg)
# NOTE: this delivers the invalid
# value up to `msgspec`'s decoding
# machinery for error raising.
return obj
# raise NotImplementedError(logmsg)
def ex_func(*args):
'''
A mod level func we can ref and load via our `NamespacePath`
python-object pointer `str` subtype.
'''
print(f'ex_func({args})')
@pytest.mark.parametrize(
'add_codec_hooks',
[
True,
False,
],
ids=['use_codec_hooks', 'no_codec_hooks'],
)
def test_custom_extension_types(
debug_mode: bool,
add_codec_hooks: bool
):
'''
Verify that a `MsgCodec` (used for encoding all outbound IPC msgs
and decoding all inbound `PayloadMsg`s) and a paired `MsgDec`
(used for decoding the `PayloadMsg.pld: Raw` received within a given
task's ipc `Context` scope) can both send and receive "extension types"
as supported via custom converter hooks passed to `msgspec`.
'''
nsp_pld_dec: MsgDec = mk_dec(
spec=None, # ONLY support the ext type
dec_hook=dec_nsp if add_codec_hooks else None,
ext_types=[NamespacePath],
)
nsp_codec: MsgCodec = mk_codec(
# ipc_pld_spec=Raw, # default!
# NOTE XXX: the encode hook MUST be used no matter what since
# our `NamespacePath` is not any of a `Any` native type nor
# a `msgspec.Struct` subtype - so `msgspec` has no way to know
# how to encode it unless we provide the custom hook.
#
# AGAIN that is, regardless of whether we spec an
# `Any`-decoded-pld the enc has no knowledge (by default)
# how to enc `NamespacePath` (nsp), so we add a custom
# hook to do that ALWAYS.
enc_hook=enc_nsp if add_codec_hooks else None,
# XXX NOTE: pretty sure this is mutex with the `type=` to
# `Decoder`? so it won't work in tandem with the
# `ipc_pld_spec` passed above?
ext_types=[NamespacePath],
# TODO? is it useful to have the `.pld` decoded *prior* to
# the `PldRx`?? like perf or mem related?
# ext_dec=nsp_pld_dec,
)
if add_codec_hooks:
assert nsp_codec.dec.dec_hook is None
# TODO? if we pass `ext_dec` above?
# assert nsp_codec.dec.dec_hook is dec_nsp
assert nsp_codec.enc.enc_hook is enc_nsp
nsp = NamespacePath.from_ref(ex_func)
try:
nsp_bytes: bytes = nsp_codec.encode(nsp)
nsp_rt_sin_msg = nsp_pld_dec.decode(nsp_bytes)
nsp_rt_sin_msg.load_ref() is ex_func
except TypeError:
if not add_codec_hooks:
pass
try:
msg_bytes: bytes = nsp_codec.encode(
Started(
cid='cid',
pld=nsp,
)
)
# since the ext-type obj should also be set as the msg.pld
assert nsp_bytes in msg_bytes
started_rt: Started = nsp_codec.decode(msg_bytes)
pld: Raw = started_rt.pld
assert isinstance(pld, Raw)
nsp_rt: NamespacePath = nsp_pld_dec.decode(pld)
assert isinstance(nsp_rt, NamespacePath)
# in obj comparison terms they should be the same
assert nsp_rt == nsp
# ensure we've decoded to ext type!
assert nsp_rt.load_ref() is ex_func
except TypeError:
if not add_codec_hooks:
pass

View File

@ -33,7 +33,6 @@ from ._codec import (
apply_codec as apply_codec, apply_codec as apply_codec,
mk_codec as mk_codec, mk_codec as mk_codec,
mk_dec as mk_dec,
MsgCodec as MsgCodec, MsgCodec as MsgCodec,
MsgDec as MsgDec, MsgDec as MsgDec,
current_codec as current_codec, current_codec as current_codec,

View File

@ -61,7 +61,6 @@ from tractor.msg.pretty_struct import Struct
from tractor.msg.types import ( from tractor.msg.types import (
mk_msg_spec, mk_msg_spec,
MsgType, MsgType,
PayloadMsg,
) )
from tractor.log import get_logger from tractor.log import get_logger
@ -81,7 +80,6 @@ class MsgDec(Struct):
''' '''
_dec: msgpack.Decoder _dec: msgpack.Decoder
# _ext_types_box: Struct|None = None
@property @property
def dec(self) -> msgpack.Decoder: def dec(self) -> msgpack.Decoder:
@ -181,120 +179,21 @@ class MsgDec(Struct):
def mk_dec( def mk_dec(
spec: Union[Type[Struct]]|Type|None, spec: Union[Type[Struct]]|Any = Any,
# NOTE, required for ad-hoc type extensions to the underlying
# serialization proto (which is default `msgpack`),
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
dec_hook: Callable|None = None, dec_hook: Callable|None = None,
ext_types: list[Type]|None = None,
) -> MsgDec: ) -> MsgDec:
''' '''
Create an IPC msg decoder, a slightly higher level wrapper around Create an IPC msg decoder, normally used as the
a `msgspec.msgpack.Decoder` which provides, `PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`.
- easier introspection of the underlying type spec via
the `.spec` and `.spec_str` attrs,
- `.hook` access to the `Decoder.dec_hook()`,
- automatic custom extension-types decode support when
`dec_hook()` is provided such that any `PayloadMsg.pld` tagged
as a type from from `ext_types` (presuming the `MsgCodec.encode()` also used
a `.enc_hook()`) is processed and constructed by a `PldRx` implicitily.
NOTE, as mentioned a `MsgDec` is normally used for `PayloadMsg.pld: PayloadT` field
decoding inside an IPC-ctx-oriented `PldRx`.
''' '''
if (
spec is None
and
ext_types is None
):
raise ValueError(
f'You must provide a type-spec for a msg decoder!\n'
f'The only time `spec=None` is permitted is if custom extension types '
f'are expected to be supported, in which case `ext_types` must be non-`None`'
f'and it is presumed that only the `ext_types` (supported by the paired `dec_hook()`) '
f'will be permitted within the type-`spec`!\n'
f'tpec = {spec!r}\n'
f'dec_hook = {dec_hook!r}\n'
f'ext_types = {ext_types!r}\n'
)
if dec_hook:
if ext_types is None:
raise ValueError(
f'If extending the serializable types with a custom decoder hook, '
f'you must also provide the expected type set `dec_hook()` will handle '
f'via the `ext_types: Union[Type]|None = None` argument!\n'
f'dec_hook = {dec_hook!r}\n'
f'ext_types = {ext_types!r}\n'
)
# XXX, i *thought* we would require a boxing struct as per docs,
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
# |_ see comment,
# > Note that typed deserialization is required for
# > successful roundtripping here, so we pass `MyMessage` to
# > `Decoder`.
#
# BUT, turns out as long as you spec a union with `Raw` it
# will work? kk B)
#
# maybe_box_struct = mk_boxed_ext_struct(ext_types)
spec = Raw | Union[*ext_types]
return MsgDec( return MsgDec(
_dec=msgpack.Decoder( _dec=msgpack.Decoder(
type=spec, # like `MsgType[Any]` type=spec, # like `MsgType[Any]`
dec_hook=dec_hook, dec_hook=dec_hook,
),
)
# TODO? remove since didn't end up needing this?
def mk_boxed_ext_struct(
ext_types: list[Type],
) -> Struct:
# NOTE, originally was to wrap non-msgpack-supported "extension
# types" in a field-typed boxing struct, see notes around the
# `dec_hook()` branch in `mk_dec()`.
ext_types_union = Union[*ext_types]
repr_ext_types_union: str = (
str(ext_types_union)
or
"|".join(ext_types)
)
BoxedExtType = msgspec.defstruct(
f'BoxedExts[{repr_ext_types_union}]',
fields=[
('boxed', ext_types_union),
],
)
return BoxedExtType
def unpack_spec_types(
spec: Union[Type]|Type,
) -> set[Type]:
'''
Given an input type-`spec`, either a lone type
or a `Union` of types (like `str|int|MyThing`),
return a set of individual types.
When `spec` is not a type-union returns `{spec,}`.
'''
spec_subtypes: set[Union[Type]] = (
getattr(
spec,
'__args__',
{spec,},
) )
) )
return spec_subtypes
def mk_msgspec_table( def mk_msgspec_table(
@ -374,8 +273,6 @@ class MsgCodec(Struct):
_dec: msgpack.Decoder _dec: msgpack.Decoder
_pld_spec: Type[Struct]|Raw|Any _pld_spec: Type[Struct]|Raw|Any
# _ext_types_box: Struct|None = None
def __repr__(self) -> str: def __repr__(self) -> str:
speclines: str = textwrap.indent( speclines: str = textwrap.indent(
pformat_msgspec(codec=self), pformat_msgspec(codec=self),
@ -442,14 +339,12 @@ class MsgCodec(Struct):
def encode( def encode(
self, self,
py_obj: Any|PayloadMsg, py_obj: Any,
use_buf: bool = False, use_buf: bool = False,
# ^-XXX-^ uhh why am i getting this? # ^-XXX-^ uhh why am i getting this?
# |_BufferError: Existing exports of data: object cannot be re-sized # |_BufferError: Existing exports of data: object cannot be re-sized
as_ext_type: bool = False,
) -> bytes: ) -> bytes:
''' '''
Encode input python objects to `msgpack` bytes for Encode input python objects to `msgpack` bytes for
@ -462,33 +357,8 @@ class MsgCodec(Struct):
if use_buf: if use_buf:
self._enc.encode_into(py_obj, self._buf) self._enc.encode_into(py_obj, self._buf)
return self._buf return self._buf
else:
return self._enc.encode(py_obj) return self._enc.encode(py_obj)
# TODO! REMOVE once i'm confident we won't ever need it!
#
# box: Struct = self._ext_types_box
# if (
# as_ext_type
# or
# (
# # XXX NOTE, auto-detect if the input type
# box
# and
# (ext_types := unpack_spec_types(
# spec=box.__annotations__['boxed'])
# )
# )
# ):
# match py_obj:
# # case PayloadMsg(pld=pld) if (
# # type(pld) in ext_types
# # ):
# # py_obj.pld = box(boxed=py_obj)
# # breakpoint()
# case _ if (
# type(py_obj) in ext_types
# ):
# py_obj = box(boxed=py_obj)
@property @property
def dec(self) -> msgpack.Decoder: def dec(self) -> msgpack.Decoder:
@ -508,30 +378,21 @@ class MsgCodec(Struct):
return self._dec.decode(msg) return self._dec.decode(msg)
# ?TODO? time to remove this finally? # [x] TODO: a sub-decoder system as well? => No!
#
# -[x] TODO: a sub-decoder system as well?
# => No! already re-architected to include a "payload-receiver"
# now found in `._ops`.
# #
# -[x] do we still want to try and support the sub-decoder with # -[x] do we still want to try and support the sub-decoder with
# `.Raw` technique in the case that the `Generic` approach gives # `.Raw` technique in the case that the `Generic` approach gives
# future grief? # future grief?
# => well YES but NO, since we went with the `PldRx` approach # => NO, since we went with the `PldRx` approach instead B)
# instead!
# #
# IF however you want to see the code that was staged for this # IF however you want to see the code that was staged for this
# from wayyy back, see the pure removal commit. # from wayyy back, see the pure removal commit.
def mk_codec( def mk_codec(
ipc_pld_spec: Union[Type[Struct]]|Any|Raw = Raw, # struct type unions set for `Decoder`
# tagged-struct-types-union set for `Decoder`ing of payloads, as # https://jcristharif.com/msgspec/structs.html#tagged-unions
# per https://jcristharif.com/msgspec/structs.html#tagged-unions. ipc_pld_spec: Union[Type[Struct]]|Any = Any,
# NOTE that the default `Raw` here **is very intentional** since
# the `PldRx._pld_dec: MsgDec` is responsible for per ipc-ctx-task
# decoding of msg-specs defined by the user as part of **their**
# `tractor` "app's" type-limited IPC msg-spec.
# TODO: offering a per-msg(-field) type-spec such that # TODO: offering a per-msg(-field) type-spec such that
# the fields can be dynamically NOT decoded and left as `Raw` # the fields can be dynamically NOT decoded and left as `Raw`
@ -544,18 +405,13 @@ def mk_codec(
libname: str = 'msgspec', libname: str = 'msgspec',
# settings for encoding-to-send extension-types, # proxy as `Struct(**kwargs)` for ad-hoc type extensions
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
# dec_hook: Callable|None = None, # ------ - ------
dec_hook: Callable|None = None,
enc_hook: Callable|None = None, enc_hook: Callable|None = None,
ext_types: list[Type]|None = None, # ------ - ------
# optionally provided msg-decoder from which we pull its,
# |_.dec_hook()
# |_.type
ext_dec: MsgDec|None = None
# #
# ?TODO? other params we might want to support
# Encoder: # Encoder:
# write_buffer_size=write_buffer_size, # write_buffer_size=write_buffer_size,
# #
@ -569,43 +425,26 @@ def mk_codec(
`msgspec` ;). `msgspec` ;).
''' '''
pld_spec = ipc_pld_spec # (manually) generate a msg-payload-spec for all relevant
if enc_hook: # god-boxing-msg subtypes, parameterizing the `PayloadMsg.pld: PayloadT`
if not ext_types: # for the decoder such that all sub-type msgs in our SCIPP
raise ValueError( # will automatically decode to a type-"limited" payload (`Struct`)
f'If extending the serializable types with a custom decoder hook, ' # object (set).
f'you must also provide the expected type set `enc_hook()` will handle '
f'via the `ext_types: Union[Type]|None = None` argument!\n'
f'enc_hook = {enc_hook!r}\n'
f'ext_types = {ext_types!r}\n'
)
dec_hook: Callable|None = None
if ext_dec:
dec: msgspec.Decoder = ext_dec.dec
dec_hook = dec.dec_hook
pld_spec |= dec.type
if ext_types:
pld_spec |= Union[*ext_types]
# (manually) generate a msg-spec (how appropes) for all relevant
# payload-boxing-struct-msg-types, parameterizing the
# `PayloadMsg.pld: PayloadT` for the decoder such that all msgs
# in our SC-RPC-protocol will automatically decode to
# a type-"limited" payload (`Struct`) object (set).
( (
ipc_msg_spec, ipc_msg_spec,
msg_types, msg_types,
) = mk_msg_spec( ) = mk_msg_spec(
payload_type_union=pld_spec, payload_type_union=ipc_pld_spec,
) )
assert len(ipc_msg_spec.__args__) == len(msg_types)
assert ipc_msg_spec
msg_spec_types: set[Type] = unpack_spec_types(ipc_msg_spec) # TODO: use this shim instead?
assert ( # bc.. unification, err somethin?
len(ipc_msg_spec.__args__) == len(msg_types) # dec: MsgDec = mk_dec(
and # spec=ipc_msg_spec,
len(msg_spec_types) == len(msg_types) # dec_hook=dec_hook,
) # )
dec = msgpack.Decoder( dec = msgpack.Decoder(
type=ipc_msg_spec, type=ipc_msg_spec,
@ -614,29 +453,22 @@ def mk_codec(
enc = msgpack.Encoder( enc = msgpack.Encoder(
enc_hook=enc_hook, enc_hook=enc_hook,
) )
codec = MsgCodec( codec = MsgCodec(
_enc=enc, _enc=enc,
_dec=dec, _dec=dec,
_pld_spec=pld_spec, _pld_spec=ipc_pld_spec,
) )
# sanity on expected backend support # sanity on expected backend support
assert codec.lib.__name__ == libname assert codec.lib.__name__ == libname
return codec return codec
# instance of the default `msgspec.msgpack` codec settings, i.e. # instance of the default `msgspec.msgpack` codec settings, i.e.
# no custom structs, hooks or other special types. # no custom structs, hooks or other special types.
# _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
# XXX NOTE XXX, this will break our `Context.start()` call!
#
# * by default we roundtrip the started pld-`value` and if you apply
# this codec (globally anyway with `apply_codec()`) then the
# `roundtripped` value will include a non-`.pld: Raw` which will
# then type-error on the consequent `._ops.validte_payload_msg()`..
#
_def_msgspec_codec: MsgCodec = mk_codec(
ipc_pld_spec=Any,
)
# The built-in IPC `Msg` spec. # The built-in IPC `Msg` spec.
# Our composing "shuttle" protocol which allows `tractor`-app code # Our composing "shuttle" protocol which allows `tractor`-app code
@ -644,13 +476,13 @@ _def_msgspec_codec: MsgCodec = mk_codec(
# https://jcristharif.com/msgspec/supported-types.html # https://jcristharif.com/msgspec/supported-types.html
# #
_def_tractor_codec: MsgCodec = mk_codec( _def_tractor_codec: MsgCodec = mk_codec(
ipc_pld_spec=Raw, # XXX should be default righ!? # TODO: use this for debug mode locking prot?
# ipc_pld_spec=Any,
ipc_pld_spec=Raw,
) )
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
# -[x] TODO, IDEALLY provides for per-`trio.Task` specificity of the
# IPC msging codec used by the transport layer when doing # IPC msging codec used by the transport layer when doing
# `Channel.send()/.recv()` of wire data. # `Channel.send()/.recv()` of wire data.
# => impled as our `PldRx` which is `Context` scoped B)
# ContextVar-TODO: DIDN'T WORK, kept resetting in every new task to default!? # ContextVar-TODO: DIDN'T WORK, kept resetting in every new task to default!?
# _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( # _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
@ -727,6 +559,17 @@ def apply_codec(
) )
token: Token = var.set(codec) token: Token = var.set(codec)
# ?TODO? for TreeVar approach which copies from the
# cancel-scope of the prior value, NOT the prior task
# See the docs:
# - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
# - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
# ^- see docs for @cm `.being()` API
# with _ctxvar_MsgCodec.being(codec):
# new = _ctxvar_MsgCodec.get()
# assert new is codec
# yield codec
try: try:
yield var.get() yield var.get()
finally: finally:
@ -737,19 +580,6 @@ def apply_codec(
) )
assert var.get() is orig assert var.get() is orig
# ?TODO? for TreeVar approach which copies from the
# cancel-scope of the prior value, NOT the prior task
#
# See the docs:
# - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
# - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
# ^- see docs for @cm `.being()` API
#
# with _ctxvar_MsgCodec.being(codec):
# new = _ctxvar_MsgCodec.get()
# assert new is codec
# yield codec
def current_codec() -> MsgCodec: def current_codec() -> MsgCodec:
''' '''
@ -769,7 +599,6 @@ def limit_msg_spec(
# -> related to the `MsgCodec._payload_decs` stuff above.. # -> related to the `MsgCodec._payload_decs` stuff above..
# tagged_structs: list[Struct]|None = None, # tagged_structs: list[Struct]|None = None,
hide_tb: bool = True,
**codec_kwargs, **codec_kwargs,
) -> MsgCodec: ) -> MsgCodec:
@ -780,7 +609,7 @@ def limit_msg_spec(
for all IPC contexts in use by the current `trio.Task`. for all IPC contexts in use by the current `trio.Task`.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = True
curr_codec: MsgCodec = current_codec() curr_codec: MsgCodec = current_codec()
msgspec_codec: MsgCodec = mk_codec( msgspec_codec: MsgCodec = mk_codec(
ipc_pld_spec=payload_spec, ipc_pld_spec=payload_spec,

View File

@ -1,90 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Type-extension-utils for codec-ing (python) objects not
covered by the `msgspec.msgpack` protocol.
See the various API docs from `msgspec`.
extending from native types,
- https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
converters,
- https://jcristharif.com/msgspec/converters.html
- https://jcristharif.com/msgspec/api.html#msgspec.convert
`Raw` fields,
- https://jcristharif.com/msgspec/api.html#raw
- support for `.convert()` and `Raw`,
|_ https://jcristharif.com/msgspec/changelog.html
'''
import typing
from typing import (
Type,
Union,
)
def dec_type_union(
type_names: list[str],
) -> Type:
'''
Look up types by name, compile into a list and then create and
return a `typing.Union` from the full set.
'''
import importlib
types: list[Type] = []
for type_name in type_names:
for mod in [
typing,
importlib.import_module(__name__),
]:
if type_ref := getattr(
mod,
type_name,
False,
):
types.append(type_ref)
# special case handling only..
# ipc_pld_spec: Union[Type] = eval(
# pld_spec_str,
# {}, # globals
# {'typing': typing}, # locals
# )
return Union[*types]
def enc_type_union(
union_or_type: Union[Type]|Type,
) -> list[str]:
'''
Encode a type-union or single type to a list of type-name-strings
ready for IPC interchange.
'''
type_strs: list[str] = []
for typ in getattr(
union_or_type,
'__args__',
{union_or_type,},
):
type_strs.append(typ.__qualname__)
return type_strs

View File

@ -50,9 +50,7 @@ from tractor._exceptions import (
_mk_recv_mte, _mk_recv_mte,
pack_error, pack_error,
) )
from tractor._state import ( from tractor._state import current_ipc_ctx
current_ipc_ctx,
)
from ._codec import ( from ._codec import (
mk_dec, mk_dec,
MsgDec, MsgDec,
@ -80,7 +78,7 @@ if TYPE_CHECKING:
log = get_logger(__name__) log = get_logger(__name__)
_def_any_pldec: MsgDec[Any] = mk_dec(spec=Any) _def_any_pldec: MsgDec[Any] = mk_dec()
class PldRx(Struct): class PldRx(Struct):
@ -150,10 +148,6 @@ class PldRx(Struct):
exit. exit.
''' '''
# TODO, ensure we pull the current `MsgCodec`'s custom
# dec/enc_hook settings as well ?
# -[ ] see `._codec.mk_codec()` inputs
#
orig_dec: MsgDec = self._pld_dec orig_dec: MsgDec = self._pld_dec
limit_dec: MsgDec = mk_dec( limit_dec: MsgDec = mk_dec(
spec=spec, spec=spec,

View File

@ -599,15 +599,15 @@ def mk_msg_spec(
Msg[payload_type_union], Msg[payload_type_union],
Generic[PayloadT], Generic[PayloadT],
) )
# defstruct_bases: tuple = ( defstruct_bases: tuple = (
# Msg, # [payload_type_union], Msg, # [payload_type_union],
# # Generic[PayloadT], # Generic[PayloadT],
# # ^-XXX-^: not allowed? lul.. # ^-XXX-^: not allowed? lul..
# ) )
ipc_msg_types: list[Msg] = [] ipc_msg_types: list[Msg] = []
idx_msg_types: list[Msg] = [] idx_msg_types: list[Msg] = []
# defs_msg_types: list[Msg] = [] defs_msg_types: list[Msg] = []
nc_msg_types: list[Msg] = [] nc_msg_types: list[Msg] = []
for msgtype in __msg_types__: for msgtype in __msg_types__:
@ -625,7 +625,7 @@ def mk_msg_spec(
# TODO: wait why do we need the dynamic version here? # TODO: wait why do we need the dynamic version here?
# XXX ANSWER XXX -> BC INHERITANCE.. don't work w generics.. # XXX ANSWER XXX -> BC INHERITANCE.. don't work w generics..
# #
# NOTE previously bc msgtypes WERE NOT inheriting # NOTE previously bc msgtypes WERE NOT inheritting
# directly the `Generic[PayloadT]` type, the manual method # directly the `Generic[PayloadT]` type, the manual method
# of generic-paraming with `.__class_getitem__()` wasn't # of generic-paraming with `.__class_getitem__()` wasn't
# working.. # working..
@ -662,35 +662,38 @@ def mk_msg_spec(
# with `msgspec.structs.defstruct` # with `msgspec.structs.defstruct`
# XXX ALSO DOESN'T WORK # XXX ALSO DOESN'T WORK
# defstruct_msgtype = defstruct( defstruct_msgtype = defstruct(
# name=msgtype.__name__, name=msgtype.__name__,
# fields=[ fields=[
# ('cid', str), ('cid', str),
# # XXX doesn't seem to work.. # XXX doesn't seem to work..
# # ('pld', PayloadT), # ('pld', PayloadT),
('pld', payload_type_union),
],
bases=defstruct_bases,
)
defs_msg_types.append(defstruct_msgtype)
# ('pld', payload_type_union),
# ],
# bases=defstruct_bases,
# )
# defs_msg_types.append(defstruct_msgtype)
# assert index_paramed_msg_type == manual_paramed_msg_subtype # assert index_paramed_msg_type == manual_paramed_msg_subtype
# paramed_msg_type = manual_paramed_msg_subtype # paramed_msg_type = manual_paramed_msg_subtype
# ipc_payload_msgs_type_union |= index_paramed_msg_type # ipc_payload_msgs_type_union |= index_paramed_msg_type
idx_spec: Union[Type[Msg]] = Union[*idx_msg_types] idx_spec: Union[Type[Msg]] = Union[*idx_msg_types]
# def_spec: Union[Type[Msg]] = Union[*defs_msg_types] def_spec: Union[Type[Msg]] = Union[*defs_msg_types]
nc_spec: Union[Type[Msg]] = Union[*nc_msg_types] nc_spec: Union[Type[Msg]] = Union[*nc_msg_types]
specs: dict[str, Union[Type[Msg]]] = { specs: dict[str, Union[Type[Msg]]] = {
'indexed_generics': idx_spec, 'indexed_generics': idx_spec,
# 'defstruct': def_spec, 'defstruct': def_spec,
'types_new_class': nc_spec, 'types_new_class': nc_spec,
} }
msgtypes_table: dict[str, list[Msg]] = { msgtypes_table: dict[str, list[Msg]] = {
'indexed_generics': idx_msg_types, 'indexed_generics': idx_msg_types,
# 'defstruct': defs_msg_types, 'defstruct': defs_msg_types,
'types_new_class': nc_msg_types, 'types_new_class': nc_msg_types,
} }