Compare commits

..

No commits in common. "67f673bf36bf2625d1c1aaa488910f5b710ec30a" and "3ba46362a94175d8b3a1650c14b9104fac8bd7e9" have entirely different histories.

7 changed files with 173 additions and 600 deletions

View File

@ -7,6 +7,7 @@ B~)
''' '''
from typing import ( from typing import (
Any, Any,
_GenericAlias,
Type, Type,
Union, Union,
) )
@ -25,23 +26,20 @@ from msgspec import (
import pytest import pytest
import tractor import tractor
from tractor.msg import ( from tractor.msg import (
_codec, _def_msgspec_codec,
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
NamespacePath, NamespacePath,
MsgCodec, MsgCodec,
mk_codec, mk_codec,
apply_codec, apply_codec,
current_codec, current_msgspec_codec,
) )
from tractor.msg import ( from tractor.msg import types
types,
)
from tractor import _state
from tractor.msg.types import ( from tractor.msg.types import (
# PayloadT, # PayloadT,
Msg, Msg,
Started, # Started,
mk_msg_spec, mk_msg_spec,
) )
import trio import trio
@ -62,110 +60,56 @@ def test_msg_spec_xor_pld_spec():
) )
# TODO: wrap these into `._codec` such that user can just pass
# a type table of some sort?
def enc_hook(obj: Any) -> Any:
if isinstance(obj, NamespacePath):
return str(obj)
else:
raise NotImplementedError(
f'Objects of type {type(obj)} are not supported'
)
def dec_hook(type: Type, obj: Any) -> Any:
print(f'type is: {type}')
if type is NamespacePath:
return NamespacePath(obj)
else:
raise NotImplementedError(
f'Objects of type {type(obj)} are not supported'
)
def ex_func(*args): def ex_func(*args):
print(f'ex_func({args})') print(f'ex_func({args})')
def mk_custom_codec( def mk_custom_codec(
pld_spec: Union[Type]|Any, ipc_msg_spec: Type[Any] = Any,
) -> MsgCodec: ) -> MsgCodec:
''' # apply custom hooks and set a `Decoder` which only
Create custom `msgpack` enc/dec-hooks and set a `Decoder` # loads `NamespacePath` types.
which only loads `NamespacePath` types.
'''
uid: tuple[str, str] = tractor.current_actor().uid
# XXX NOTE XXX: despite defining `NamespacePath` as a type
# field on our `Msg.pld`, we still need a enc/dec_hook() pair
# to cast to/from that type on the wire. See the docs:
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
def enc_nsp(obj: Any) -> Any:
match obj:
case NamespacePath():
print(
f'{uid}: `NamespacePath`-Only ENCODE?\n'
f'type: {type(obj)}\n'
f'obj: {obj}\n'
)
return str(obj)
logmsg: str = (
f'{uid}: Encoding `{obj}: <{type(obj)}>` not supported'
f'type: {type(obj)}\n'
f'obj: {obj}\n'
)
print(logmsg)
raise NotImplementedError(logmsg)
def dec_nsp(
type: Type,
obj: Any,
) -> Any:
print(
f'{uid}: CUSTOM DECODE\n'
f'input type: {type}\n'
f'obj: {obj}\n'
f'type(obj): `{type(obj).__class__}`\n'
)
nsp = None
# This never seems to hit?
if isinstance(obj, Msg):
print(f'Msg type: {obj}')
if (
type is NamespacePath
and isinstance(obj, str)
and ':' in obj
):
nsp = NamespacePath(obj)
if nsp:
print(f'Returning NSP instance: {nsp}')
return nsp
logmsg: str = (
f'{uid}: Decoding `{obj}: <{type(obj)}>` not supported'
f'input type: {type(obj)}\n'
f'obj: {obj}\n'
f'type(obj): `{type(obj).__class__}`\n'
)
print(logmsg)
raise NotImplementedError(logmsg)
nsp_codec: MsgCodec = mk_codec( nsp_codec: MsgCodec = mk_codec(
ipc_pld_spec=pld_spec, ipc_msg_spec=ipc_msg_spec,
enc_hook=enc_hook,
# NOTE XXX: the encode hook MUST be used no matter what since dec_hook=dec_hook,
# our `NamespacePath` is not any of a `Any` native type nor
# a `msgspec.Struct` subtype - so `msgspec` has no way to know
# how to encode it unless we provide the custom hook.
#
# AGAIN that is, regardless of whether we spec an
# `Any`-decoded-pld the enc has no knowledge (by default)
# how to enc `NamespacePath` (nsp), so we add a custom
# hook to do that ALWAYS.
enc_hook=enc_nsp,
# XXX NOTE: pretty sure this is mutex with the `type=` to
# `Decoder`? so it won't work in tandem with the
# `ipc_pld_spec` passed above?
dec_hook=dec_nsp,
) )
# TODO: validate `MsgCodec` interface/semantics?
# -[ ] simple field tests to ensure caching + reset is workin?
# -[ ] custom / changing `.decoder()` calls?
#
# dec = nsp_codec.decoder(
# types=NamespacePath,
# )
# assert nsp_codec.dec is dec
return nsp_codec return nsp_codec
@tractor.context @tractor.context
async def send_back_nsp( async def send_back_nsp(
ctx: Context, ctx: tractor.Context,
expect_debug: bool,
use_any_spec: bool,
) -> None: ) -> None:
''' '''
@ -173,65 +117,28 @@ async def send_back_nsp(
and ensure we can round trip a func ref with our parent. and ensure we can round trip a func ref with our parent.
''' '''
# debug mode sanity check task: trio.Task = trio.lowlevel.current_task()
assert expect_debug == _state.debug_mode() task_ctx: Context = task.context
assert _ctxvar_MsgCodec not in task_ctx
# task: trio.Task = trio.lowlevel.current_task() nsp_codec: MsgCodec = mk_custom_codec()
# TreeVar
# curr_codec = _ctxvar_MsgCodec.get_in(task)
# ContextVar
# task_ctx: Context = task.context
# assert _ctxvar_MsgCodec not in task_ctx
curr_codec = _ctxvar_MsgCodec.get()
assert curr_codec is _codec._def_tractor_codec
if use_any_spec:
pld_spec = Any
else:
# NOTE: don't need the |None here since
# the parent side will never send `None` like
# we do here in the implicit return at the end of this
# `@context` body.
pld_spec = NamespacePath # |None
nsp_codec: MsgCodec = mk_custom_codec(
pld_spec=pld_spec,
)
with apply_codec(nsp_codec) as codec: with apply_codec(nsp_codec) as codec:
chk_codec_applied( chk_codec_applied(
custom_codec=nsp_codec, custom_codec=nsp_codec,
enter_value=codec, enter_value=codec,
) )
# ensure roundtripping works locally
nsp = NamespacePath.from_ref(ex_func) nsp = NamespacePath.from_ref(ex_func)
wire_bytes: bytes = nsp_codec.encode(
Started(
cid=ctx.cid,
pld=nsp
)
)
msg: Started = nsp_codec.decode(wire_bytes)
pld = msg.pld
assert pld == nsp
await ctx.started(nsp) await ctx.started(nsp)
async with ctx.open_stream() as ipc: async with ctx.open_stream() as ipc:
async for msg in ipc: async for msg in ipc:
if use_any_spec: assert msg == f'{__name__}:ex_func'
assert msg == f'{__name__}:ex_func'
# TODO: as per below # TODO: as per below
# assert isinstance(msg, NamespacePath) # assert isinstance(msg, NamespacePath)
assert isinstance(msg, str) assert isinstance(msg, str)
else:
assert isinstance(msg, NamespacePath)
await ipc.send(msg)
def chk_codec_applied( def chk_codec_applied(
@ -239,20 +146,11 @@ def chk_codec_applied(
enter_value: MsgCodec, enter_value: MsgCodec,
) -> MsgCodec: ) -> MsgCodec:
# task: trio.Task = trio.lowlevel.current_task() task: trio.Task = trio.lowlevel.current_task()
task_ctx: Context = task.context
# TreeVar assert _ctxvar_MsgCodec in task_ctx
# curr_codec = _ctxvar_MsgCodec.get_in(task) curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec]
# ContextVar
# task_ctx: Context = task.context
# assert _ctxvar_MsgCodec in task_ctx
# curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec]
# RunVar
curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
last_read_codec = _ctxvar_MsgCodec.get()
assert curr_codec is last_read_codec
assert ( assert (
# returned from `mk_codec()` # returned from `mk_codec()`
@ -265,31 +163,14 @@ def chk_codec_applied(
curr_codec is curr_codec is
# public API for all of the above # public API for all of the above
current_codec() current_msgspec_codec()
# the default `msgspec` settings # the default `msgspec` settings
is not _codec._def_msgspec_codec is not _def_msgspec_codec
is not _codec._def_tractor_codec
) )
@pytest.mark.parametrize( def test_codec_hooks_mod():
'ipc_pld_spec',
[
# _codec._def_msgspec_codec,
Any,
# _codec._def_tractor_codec,
NamespacePath|None,
],
ids=[
'any_type',
'nsp_type',
]
)
def test_codec_hooks_mod(
debug_mode: bool,
ipc_pld_spec: Union[Type]|Any,
):
''' '''
Audit the `.msg.MsgCodec` override apis details given our impl Audit the `.msg.MsgCodec` override apis details given our impl
uses `contextvars` to accomplish per `trio` task codec uses `contextvars` to accomplish per `trio` task codec
@ -297,21 +178,11 @@ def test_codec_hooks_mod(
''' '''
async def main(): async def main():
task: trio.Task = trio.lowlevel.current_task()
task_ctx: Context = task.context
assert _ctxvar_MsgCodec not in task_ctx
# task: trio.Task = trio.lowlevel.current_task() async with tractor.open_nursery() as an:
# ContextVar
# task_ctx: Context = task.context
# assert _ctxvar_MsgCodec not in task_ctx
# TreeVar
# def_codec: MsgCodec = _ctxvar_MsgCodec.get_in(task)
def_codec = _ctxvar_MsgCodec.get()
assert def_codec is _codec._def_tractor_codec
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
p: tractor.Portal = await an.start_actor( p: tractor.Portal = await an.start_actor(
'sub', 'sub',
enable_modules=[__name__], enable_modules=[__name__],
@ -321,9 +192,7 @@ def test_codec_hooks_mod(
# - codec not modified -> decode nsp as `str` # - codec not modified -> decode nsp as `str`
# - codec modified with hooks -> decode nsp as # - codec modified with hooks -> decode nsp as
# `NamespacePath` # `NamespacePath`
nsp_codec: MsgCodec = mk_custom_codec( nsp_codec: MsgCodec = mk_custom_codec()
pld_spec=ipc_pld_spec,
)
with apply_codec(nsp_codec) as codec: with apply_codec(nsp_codec) as codec:
chk_codec_applied( chk_codec_applied(
custom_codec=nsp_codec, custom_codec=nsp_codec,
@ -333,22 +202,9 @@ def test_codec_hooks_mod(
async with ( async with (
p.open_context( p.open_context(
send_back_nsp, send_back_nsp,
# TODO: send the original nsp here and
# test with `limit_msg_spec()` above?
expect_debug=debug_mode,
use_any_spec=(ipc_pld_spec==Any),
) as (ctx, first), ) as (ctx, first),
ctx.open_stream() as ipc, ctx.open_stream() as ipc,
): ):
if ipc_pld_spec is NamespacePath:
assert isinstance(first, NamespacePath)
print(
'root: ENTERING CONTEXT BLOCK\n'
f'type(first): {type(first)}\n'
f'first: {first}\n'
)
# ensure codec is still applied across # ensure codec is still applied across
# `tractor.Context` + its embedded nursery. # `tractor.Context` + its embedded nursery.
chk_codec_applied( chk_codec_applied(
@ -356,46 +212,23 @@ def test_codec_hooks_mod(
enter_value=codec, enter_value=codec,
) )
first_nsp = NamespacePath(first) assert first == f'{__name__}:ex_func'
# ensure roundtripping works
wire_bytes: bytes = nsp_codec.encode(
Started(
cid=ctx.cid,
pld=first_nsp
)
)
msg: Started = nsp_codec.decode(wire_bytes)
pld = msg.pld
assert pld == first_nsp
# try a manual decode of the started msg+pld
# TODO: actually get the decoder loading # TODO: actually get the decoder loading
# to native once we spec our SCIPP msgspec # to native once we spec our SCIPP msgspec
# (structurred-conc-inter-proc-protocol) # (structurred-conc-inter-proc-protocol)
# implemented as per, # implemented as per,
# https://github.com/goodboy/tractor/issues/36 # https://github.com/goodboy/tractor/issues/36
# #
if ipc_pld_spec is NamespacePath: # assert isinstance(first, NamespacePath)
assert isinstance(first, NamespacePath) assert isinstance(first, str)
# `Any`-payload-spec case
else:
assert isinstance(first, str)
assert first == f'{__name__}:ex_func'
await ipc.send(first) await ipc.send(first)
with trio.move_on_after(.6): with trio.move_on_after(1):
async for msg in ipc: async for msg in ipc:
print(msg)
# TODO: as per above # TODO: as per above
# assert isinstance(msg, NamespacePath) # assert isinstance(msg, NamespacePath)
assert isinstance(msg, str) assert isinstance(msg, str)
await ipc.send(msg)
await trio.sleep(0.1)
await p.cancel_actor() await p.cancel_actor()

View File

@ -845,10 +845,7 @@ async def keep_sending_from_callee(
('caller', 1, never_open_stream), ('caller', 1, never_open_stream),
('callee', 0, keep_sending_from_callee), ('callee', 0, keep_sending_from_callee),
], ],
ids=[ ids='overrun_condition={}'.format,
('caller_1buf_never_open_stream'),
('callee_0buf_keep_sending_from_callee'),
]
) )
def test_one_end_stream_not_opened( def test_one_end_stream_not_opened(
overrun_by: tuple[str, int, Callable], overrun_by: tuple[str, int, Callable],
@ -872,30 +869,29 @@ def test_one_end_stream_not_opened(
enable_modules=[__name__], enable_modules=[__name__],
) )
with trio.fail_after(1): async with portal.open_context(
async with portal.open_context( entrypoint,
entrypoint, ) as (ctx, sent):
) as (ctx, sent): assert sent is None
assert sent is None
if 'caller' in overrunner: if 'caller' in overrunner:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
# itersend +1 msg more then the buffer size # itersend +1 msg more then the buffer size
# to cause the most basic overrun. # to cause the most basic overrun.
for i in range(buf_size): for i in range(buf_size):
print(f'sending {i}') print(f'sending {i}')
await stream.send(i) await stream.send(i)
else: else:
# expect overrun error to be relayed back # expect overrun error to be relayed back
# and this sleep interrupted # and this sleep interrupted
await trio.sleep_forever() await trio.sleep_forever()
else: else:
# callee overruns caller case so we do nothing here # callee overruns caller case so we do nothing here
await trio.sleep_forever() await trio.sleep_forever()
await portal.cancel_actor() await portal.cancel_actor()

View File

@ -190,14 +190,11 @@ class Lock:
is_trio_main = ( is_trio_main = (
# TODO: since this is private, @oremanj says # TODO: since this is private, @oremanj says
# we should just copy the impl for now.. # we should just copy the impl for now..
(is_main_thread := trio._util.is_main_thread()) trio._util.is_main_thread()
and and
(async_lib := sniffio.current_async_library()) == 'trio' (async_lib := sniffio.current_async_library()) == 'trio'
) )
if ( if not is_trio_main:
not is_trio_main
and is_main_thread
):
log.warning( log.warning(
f'Current async-lib detected by `sniffio`: {async_lib}\n' f'Current async-lib detected by `sniffio`: {async_lib}\n'
) )

View File

@ -31,24 +31,25 @@ from ._codec import (
apply_codec as apply_codec, apply_codec as apply_codec,
mk_codec as mk_codec, mk_codec as mk_codec,
MsgCodec as MsgCodec, MsgCodec as MsgCodec,
current_codec as current_codec, current_msgspec_codec as current_msgspec_codec,
) )
from .types import ( from .types import (
Msg as Msg, Msg as Msg,
Aid as Aid, Start as Start, # with pld
SpawnSpec as SpawnSpec, FuncSpec as FuncSpec,
Start as Start, StartAck as StartAck, # with pld
StartAck as StartAck, IpcCtxSpec as IpcCtxSpec,
Started as Started, Started as Started,
Yield as Yield, Yield as Yield,
Stop as Stop, Stop as Stop,
Return as Return, Return as Return,
Error as Error, Error as Error, # with pld
ErrorData as ErrorData,
# full msg spec set # full msg spec set
__spec__ as __spec__, __spec__ as __spec__,

View File

@ -30,13 +30,13 @@ ToDo: backends we prolly should offer:
''' '''
from __future__ import annotations from __future__ import annotations
from contextvars import (
ContextVar,
Token,
)
from contextlib import ( from contextlib import (
contextmanager as cm, contextmanager as cm,
) )
# from contextvars import (
# ContextVar,
# Token,
# )
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -47,12 +47,6 @@ from types import ModuleType
import msgspec import msgspec
from msgspec import msgpack from msgspec import msgpack
from trio.lowlevel import (
RunVar,
RunVarToken,
)
# TODO: see notes below from @mikenerone..
# from tricycle import TreeVar
from tractor.msg.pretty_struct import Struct from tractor.msg.pretty_struct import Struct
from tractor.msg.types import ( from tractor.msg.types import (
@ -78,9 +72,6 @@ class MsgCodec(Struct):
''' '''
A IPC msg interchange format lib's encoder + decoder pair. A IPC msg interchange format lib's encoder + decoder pair.
Pretty much nothing more then delegation to underlying
`msgspec.<interchange-protocol>.Encoder/Decoder`s for now.
''' '''
_enc: msgpack.Encoder _enc: msgpack.Encoder
_dec: msgpack.Decoder _dec: msgpack.Decoder
@ -95,6 +86,11 @@ class MsgCodec(Struct):
lib: ModuleType = msgspec lib: ModuleType = msgspec
# ad-hoc type extensions
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
enc_hook: Callable[[Any], Any]|None = None # coder
dec_hook: Callable[[type, Any], Any]|None = None # decoder
# TODO: a sub-decoder system as well? # TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any # payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types` # see related comments in `.msg.types`
@ -308,8 +304,7 @@ def mk_codec(
libname: str = 'msgspec', libname: str = 'msgspec',
# proxy as `Struct(**kwargs)` for ad-hoc type extensions # proxy as `Struct(**kwargs)`
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
# ------ - ------ # ------ - ------
dec_hook: Callable|None = None, dec_hook: Callable|None = None,
enc_hook: Callable|None = None, enc_hook: Callable|None = None,
@ -394,52 +389,14 @@ def mk_codec(
# no custom structs, hooks or other special types. # no custom structs, hooks or other special types.
_def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any) _def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any)
# The built-in IPC `Msg` spec. # NOTE: provides for per-`trio.Task` specificity of the
# Our composing "shuttle" protocol which allows `tractor`-app code
# to use any `msgspec` supported type as the `Msg.pld` payload,
# https://jcristharif.com/msgspec/supported-types.html
#
_def_tractor_codec: MsgCodec = mk_codec(
ipc_pld_spec=Any,
)
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
# IPC msging codec used by the transport layer when doing # IPC msging codec used by the transport layer when doing
# `Channel.send()/.recv()` of wire data. # `Channel.send()/.recv()` of wire data.
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
# ContextVar-TODO: DIDN'T WORK, kept resetting in every new task to default!?
# _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
# TreeVar-TODO: DIDN'T WORK, kept resetting in every new embedded nursery
# even though it's supposed to inherit from a parent context ???
#
# _ctxvar_MsgCodec: TreeVar[MsgCodec] = TreeVar(
#
# ^-NOTE-^: for this to work see the mods by @mikenerone from `trio` gitter:
#
# 22:02:54 <mikenerone> even for regular contextvars, all you have to do is:
# `task: Task = trio.lowlevel.current_task()`
# `task.parent_nursery.parent_task.context.run(my_ctx_var.set, new_value)`
#
# From a comment in his prop code he couldn't share outright:
# 1. For every TreeVar set in the current task (which covers what
# we need from SynchronizerFacade), walk up the tree until the
# root or finding one where the TreeVar is already set, setting
# it in all of the contexts along the way.
# 2. For each of those, we also forcibly set the values that are
# pending for child nurseries that have not yet accessed the
# TreeVar.
# 3. We similarly set the pending values for the child nurseries
# of the *current* task.
#
# TODO: STOP USING THIS, since it's basically a global and won't
# allow sub-IPC-ctxs to limit the msg-spec however desired..
_ctxvar_MsgCodec: MsgCodec = RunVar(
'msgspec_codec', 'msgspec_codec',
# TODO: move this to our new `Msg`-spec! # TODO: move this to our new `Msg`-spec!
# default=_def_msgspec_codec, default=_def_msgspec_codec,
default=_def_tractor_codec,
) )
@ -453,36 +410,15 @@ def apply_codec(
runtime context such that all IPC msgs are processed runtime context such that all IPC msgs are processed
with it for that task. with it for that task.
Uses a `tricycle.TreeVar` to ensure the scope of the codec
matches the `@cm` block and DOES NOT change to the original
(default) value in new tasks (as it does for `ContextVar`).
See the docs:
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
''' '''
orig: MsgCodec = _ctxvar_MsgCodec.get() token: Token = _ctxvar_MsgCodec.set(codec)
assert orig is not codec
token: RunVarToken = _ctxvar_MsgCodec.set(codec)
# TODO: for TreeVar approach, see docs for @cm `.being()` API:
# https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
# try:
# with _ctxvar_MsgCodec.being(codec):
# new = _ctxvar_MsgCodec.get()
# assert new is codec
# yield codec
try: try:
yield _ctxvar_MsgCodec.get() yield _ctxvar_MsgCodec.get()
finally: finally:
_ctxvar_MsgCodec.reset(token) _ctxvar_MsgCodec.reset(token)
assert _ctxvar_MsgCodec.get() is orig
def current_msgspec_codec() -> MsgCodec:
def current_codec() -> MsgCodec:
''' '''
Return the current `trio.Task.context`'s value Return the current `trio.Task.context`'s value
for `msgspec_codec` used by `Channel.send/.recv()` for `msgspec_codec` used by `Channel.send/.recv()`
@ -513,6 +449,5 @@ def limit_msg_spec(
payload_types=payload_types, payload_types=payload_types,
**codec_kwargs, **codec_kwargs,
) )
with apply_codec(msgspec_codec) as applied_codec: with apply_codec(msgspec_codec):
assert applied_codec is msgspec_codec
yield msgspec_codec yield msgspec_codec

View File

@ -80,28 +80,6 @@ class DiffDump(UserList):
return repstr return repstr
def iter_fields(struct: Struct) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(struct):
key: str = fi.name
val: Any = getattr(struct, key)
yield (
fi,
key,
val,
)
class Struct( class Struct(
_Struct, _Struct,
@ -113,6 +91,23 @@ class Struct(
A "human friendlier" (aka repl buddy) struct subtype. A "human friendlier" (aka repl buddy) struct subtype.
''' '''
def _sin_props(self) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(self):
key: str = fi.name
val: Any = getattr(self, key)
yield fi, key, val
def to_dict( def to_dict(
self, self,
include_non_members: bool = True, include_non_members: bool = True,
@ -135,7 +130,7 @@ class Struct(
# added as type-defined `@property` methods! # added as type-defined `@property` methods!
sin_props: dict = {} sin_props: dict = {}
fi: structs.FieldInfo fi: structs.FieldInfo
for fi, k, v in iter_fields(self): for fi, k, v in self._sin_props():
sin_props[k] = asdict[k] sin_props[k] = asdict[k]
return sin_props return sin_props
@ -164,7 +159,7 @@ class Struct(
fi: structs.FieldInfo fi: structs.FieldInfo
k: str k: str
v: Any v: Any
for fi, k, v in iter_fields(self): for fi, k, v in self._sin_props():
# TODO: how can we prefer `Literal['option1', 'option2, # TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the # ..]` over .__name__ == `Literal` but still get only the

View File

@ -26,7 +26,6 @@ from __future__ import annotations
import types import types
from typing import ( from typing import (
Any, Any,
Callable,
Generic, Generic,
Literal, Literal,
Type, Type,
@ -38,12 +37,8 @@ from msgspec import (
defstruct, defstruct,
# field, # field,
Struct, Struct,
# UNSET, UNSET,
# UnsetType, UnsetType,
)
from tractor.msg import (
pretty_struct,
) )
# type variable for the boxed payload field `.pld` # type variable for the boxed payload field `.pld`
@ -53,19 +48,11 @@ PayloadT = TypeVar('PayloadT')
class Msg( class Msg(
Struct, Struct,
Generic[PayloadT], Generic[PayloadT],
# https://jcristharif.com/msgspec/structs.html#tagged-unions
tag=True, tag=True,
tag_field='msg_type', tag_field='msg_type',
# https://jcristharif.com/msgspec/structs.html#field-ordering # eq=True,
# kw_only=True,
# https://jcristharif.com/msgspec/structs.html#equality-and-order
# order=True, # order=True,
# https://jcristharif.com/msgspec/structs.html#encoding-decoding-as-arrays
# as_array=True,
): ):
''' '''
The "god" boxing msg type. The "god" boxing msg type.
@ -103,53 +90,6 @@ class Msg(
pld: PayloadT pld: PayloadT
class Aid(
Struct,
tag=True,
tag_field='msg_type',
):
'''
Actor-identity msg.
Initial contact exchange enabling an actor "mailbox handshake"
delivering the peer identity (and maybe eventually contact)
info.
Used by discovery protocol to register actors as well as
conduct the initial comms (capability) filtering.
'''
name: str
uuid: str
# TODO: use built-in support for UUIDs?
# -[ ] `uuid.UUID` which has multi-protocol support
# https://jcristharif.com/msgspec/supported-types.html#uuid
class SpawnSpec(
pretty_struct.Struct,
tag=True,
tag_field='msg_type',
):
'''
Initial runtime spec handed down from a spawning parent to its
child subactor immediately following first contact via an
`Aid` msg.
'''
_parent_main_data: dict
_runtime_vars: dict[str, Any]
# module import capability
enable_modules: dict[str, str]
# TODO: not just sockaddr pairs?
# -[ ] abstract into a `TransportAddr` type?
reg_addrs: list[tuple[str, int]]
bind_addrs: list[tuple[str, int]]
# TODO: caps based RPC support in the payload? # TODO: caps based RPC support in the payload?
# #
# -[ ] integration with our ``enable_modules: list[str]`` caps sys. # -[ ] integration with our ``enable_modules: list[str]`` caps sys.
@ -165,31 +105,18 @@ class SpawnSpec(
# #
# -[ ] can we combine .ns + .func into a native `NamespacePath` field? # -[ ] can we combine .ns + .func into a native `NamespacePath` field?
# #
# -[ ] better name, like `Call/TaskInput`? # -[ ]better name, like `Call/TaskInput`?
# #
# -[ ] XXX a debugger lock msg transaction with payloads like, class FuncSpec(Struct):
# child -> `.pld: DebugLock` -> root ns: str
# child <- `.pld: DebugLocked` <- root func: str
# child -> `.pld: DebugRelease` -> root
# kwargs: dict
# WHY => when a pld spec is provided it might not allow for uid: str # (calling) actor-id
# debug mode msgs as they currently are (using plain old `pld.
# str` payloads) so we only when debug_mode=True we need to
# union in this debugger payload set?
#
# mk_msg_spec(
# MyPldSpec,
# debug_mode=True,
# ) -> (
# Union[MyPldSpec]
# | Union[DebugLock, DebugLocked, DebugRelease]
# )
class Start( class Start(
Struct, Msg,
tag=True,
tag_field='msg_type',
): ):
''' '''
Initial request to remotely schedule an RPC `trio.Task` via Initial request to remotely schedule an RPC `trio.Task` via
@ -207,26 +134,14 @@ class Start(
- `Context.open_context()` - `Context.open_context()`
''' '''
cid: str pld: FuncSpec
ns: str
func: str
kwargs: dict
uid: tuple[str, str] # (calling) actor-id
class StartAck( class IpcCtxSpec(Struct):
Struct,
tag=True,
tag_field='msg_type',
):
''' '''
Init response to a `Cmd` request indicating the far An inter-actor-`trio.Task`-comms `Context` spec.
end's RPC spec, namely its callable "type".
''' '''
cid: str
# TODO: maybe better names for all these? # TODO: maybe better names for all these?
# -[ ] obvi ^ would need sync with `._rpc` # -[ ] obvi ^ would need sync with `._rpc`
functype: Literal[ functype: Literal[
@ -245,6 +160,18 @@ class StartAck(
# msgspec: MsgSpec # msgspec: MsgSpec
class StartAck(
Msg,
Generic[PayloadT],
):
'''
Init response to a `Cmd` request indicating the far
end's RPC callable "type".
'''
pld: IpcCtxSpec
class Started( class Started(
Msg, Msg,
Generic[PayloadT], Generic[PayloadT],
@ -275,19 +202,13 @@ class Yield(
pld: PayloadT pld: PayloadT
class Stop( class Stop(Msg):
Struct,
tag=True,
tag_field='msg_type',
):
''' '''
Stream termination signal much like an IPC version Stream termination signal much like an IPC version
of `StopAsyncIteration`. of `StopAsyncIteration`.
''' '''
cid: str pld: UnsetType = UNSET
# TODO: do we want to support a payload on stop?
# pld: UnsetType = UNSET
class Return( class Return(
@ -302,33 +223,32 @@ class Return(
pld: PayloadT pld: PayloadT
class Error( class ErrorData(Struct):
Struct,
tag=True,
tag_field='msg_type',
):
''' '''
A pkt that wraps `RemoteActorError`s for relay and raising. Remote actor error meta-data as needed originally by
Fields are 1-to-1 meta-data as needed originally by
`RemoteActorError.msgdata: dict`. `RemoteActorError.msgdata: dict`.
''' '''
src_uid: tuple[str, str] src_uid: str
src_type_str: str src_type_str: str
boxed_type_str: str boxed_type_str: str
relay_path: list[tuple[str, str]]
relay_path: list[str]
tb_str: str tb_str: str
cid: str|None = None
# TODO: use UNSET or don't include them via
#
# `ContextCancelled` # `ContextCancelled`
canceller: tuple[str, str]|None = None canceller: str|None = None
# `StreamOverrun` # `StreamOverrun`
sender: tuple[str, str]|None = None sender: str|None = None
class Error(Msg):
'''
A pkt that wraps `RemoteActorError`s for relay.
'''
pld: ErrorData
# TODO: should be make a msg version of `ContextCancelled?` # TODO: should be make a msg version of `ContextCancelled?`
@ -345,12 +265,6 @@ class Error(
# approx order of the IPC txn-state spaces. # approx order of the IPC txn-state spaces.
__spec__: list[Msg] = [ __spec__: list[Msg] = [
# identity handshake
Aid,
# spawn specification from parent
SpawnSpec,
# inter-actor RPC initiation # inter-actor RPC initiation
Start, Start,
StartAck, StartAck,
@ -366,8 +280,6 @@ __spec__: list[Msg] = [
] ]
_runtime_spec_msgs: list[Msg] = [ _runtime_spec_msgs: list[Msg] = [
Aid,
SpawnSpec,
Start, Start,
StartAck, StartAck,
Stop, Stop,
@ -531,99 +443,3 @@ def mk_msg_spec(
pld_spec | runtime_spec, pld_spec | runtime_spec,
msgtypes_table[spec_build_method] + ipc_msg_types, msgtypes_table[spec_build_method] + ipc_msg_types,
) )
# TODO: make something similar to this inside `._codec` such that
# user can just pass a type table of some sort?
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
# '''
# Deliver a `enc_hook()`/`dec_hook()` pair which does
# manual convertion from our above native `Msg` set
# to `dict` equivalent (wire msgs) in order to keep legacy compat
# with the original runtime implementation.
# Note: this is is/was primarly used while moving the core
# runtime over to using native `Msg`-struct types wherein we
# start with the send side emitting without loading
# a typed-decoder and then later flipping the switch over to
# load to the native struct types once all runtime usage has
# been adjusted appropriately.
# '''
# def enc_to_dict(msg: Any) -> Any:
# '''
# Encode `Msg`-structs to `dict` msgs instead
# of using `msgspec.msgpack.Decoder.type`-ed
# features.
# '''
# match msg:
# case Start():
# dctmsg: dict = pretty_struct.Struct.to_dict(
# msg
# )['pld']
# case Error():
# dctmsg: dict = pretty_struct.Struct.to_dict(
# msg
# )['pld']
# return {'error': dctmsg}
# def dec_from_dict(
# type: Type,
# obj: Any,
# ) -> Any:
# '''
# Decode to `Msg`-structs from `dict` msgs instead
# of using `msgspec.msgpack.Decoder.type`-ed
# features.
# '''
# cid: str = obj.get('cid')
# match obj:
# case {'cmd': pld}:
# return Start(
# cid=cid,
# pld=pld,
# )
# case {'functype': pld}:
# return StartAck(
# cid=cid,
# functype=pld,
# # pld=IpcCtxSpec(
# # functype=pld,
# # ),
# )
# case {'started': pld}:
# return Started(
# cid=cid,
# pld=pld,
# )
# case {'yield': pld}:
# return Yield(
# cid=obj['cid'],
# pld=pld,
# )
# case {'stop': pld}:
# return Stop(
# cid=cid,
# )
# case {'return': pld}:
# return Return(
# cid=cid,
# pld=pld,
# )
# case {'error': pld}:
# return Error(
# cid=cid,
# pld=ErrorData(
# **pld
# ),
# )
# return (
# # enc_to_dict,
# dec_from_dict,
# )