Compare commits
3 Commits
67f673bf36
...
f2ce4a3469
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | f2ce4a3469 | |
Tyler Goodlet | 3aa964315a | |
Tyler Goodlet | f3ca8608d5 |
|
@ -7,7 +7,6 @@ B~)
|
||||||
'''
|
'''
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
_GenericAlias,
|
|
||||||
Type,
|
Type,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
@ -26,20 +25,23 @@ from msgspec import (
|
||||||
import pytest
|
import pytest
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
_def_msgspec_codec,
|
_codec,
|
||||||
_ctxvar_MsgCodec,
|
_ctxvar_MsgCodec,
|
||||||
|
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
MsgCodec,
|
MsgCodec,
|
||||||
mk_codec,
|
mk_codec,
|
||||||
apply_codec,
|
apply_codec,
|
||||||
current_msgspec_codec,
|
current_codec,
|
||||||
)
|
)
|
||||||
from tractor.msg import types
|
from tractor.msg import (
|
||||||
|
types,
|
||||||
|
)
|
||||||
|
from tractor import _state
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
# PayloadT,
|
# PayloadT,
|
||||||
Msg,
|
Msg,
|
||||||
# Started,
|
Started,
|
||||||
mk_msg_spec,
|
mk_msg_spec,
|
||||||
)
|
)
|
||||||
import trio
|
import trio
|
||||||
|
@ -60,56 +62,110 @@ def test_msg_spec_xor_pld_spec():
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# TODO: wrap these into `._codec` such that user can just pass
|
|
||||||
# a type table of some sort?
|
|
||||||
def enc_hook(obj: Any) -> Any:
|
|
||||||
if isinstance(obj, NamespacePath):
|
|
||||||
return str(obj)
|
|
||||||
else:
|
|
||||||
raise NotImplementedError(
|
|
||||||
f'Objects of type {type(obj)} are not supported'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def dec_hook(type: Type, obj: Any) -> Any:
|
|
||||||
print(f'type is: {type}')
|
|
||||||
if type is NamespacePath:
|
|
||||||
return NamespacePath(obj)
|
|
||||||
else:
|
|
||||||
raise NotImplementedError(
|
|
||||||
f'Objects of type {type(obj)} are not supported'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def ex_func(*args):
|
def ex_func(*args):
|
||||||
print(f'ex_func({args})')
|
print(f'ex_func({args})')
|
||||||
|
|
||||||
|
|
||||||
def mk_custom_codec(
|
def mk_custom_codec(
|
||||||
ipc_msg_spec: Type[Any] = Any,
|
pld_spec: Union[Type]|Any,
|
||||||
) -> MsgCodec:
|
|
||||||
# apply custom hooks and set a `Decoder` which only
|
|
||||||
# loads `NamespacePath` types.
|
|
||||||
nsp_codec: MsgCodec = mk_codec(
|
|
||||||
ipc_msg_spec=ipc_msg_spec,
|
|
||||||
enc_hook=enc_hook,
|
|
||||||
dec_hook=dec_hook,
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: validate `MsgCodec` interface/semantics?
|
) -> MsgCodec:
|
||||||
# -[ ] simple field tests to ensure caching + reset is workin?
|
'''
|
||||||
# -[ ] custom / changing `.decoder()` calls?
|
Create custom `msgpack` enc/dec-hooks and set a `Decoder`
|
||||||
#
|
which only loads `NamespacePath` types.
|
||||||
# dec = nsp_codec.decoder(
|
|
||||||
# types=NamespacePath,
|
'''
|
||||||
# )
|
uid: tuple[str, str] = tractor.current_actor().uid
|
||||||
# assert nsp_codec.dec is dec
|
|
||||||
|
# XXX NOTE XXX: despite defining `NamespacePath` as a type
|
||||||
|
# field on our `Msg.pld`, we still need a enc/dec_hook() pair
|
||||||
|
# to cast to/from that type on the wire. See the docs:
|
||||||
|
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
||||||
|
|
||||||
|
def enc_nsp(obj: Any) -> Any:
|
||||||
|
match obj:
|
||||||
|
case NamespacePath():
|
||||||
|
print(
|
||||||
|
f'{uid}: `NamespacePath`-Only ENCODE?\n'
|
||||||
|
f'type: {type(obj)}\n'
|
||||||
|
f'obj: {obj}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
return str(obj)
|
||||||
|
|
||||||
|
logmsg: str = (
|
||||||
|
f'{uid}: Encoding `{obj}: <{type(obj)}>` not supported'
|
||||||
|
f'type: {type(obj)}\n'
|
||||||
|
f'obj: {obj}\n'
|
||||||
|
)
|
||||||
|
print(logmsg)
|
||||||
|
raise NotImplementedError(logmsg)
|
||||||
|
|
||||||
|
def dec_nsp(
|
||||||
|
type: Type,
|
||||||
|
obj: Any,
|
||||||
|
|
||||||
|
) -> Any:
|
||||||
|
print(
|
||||||
|
f'{uid}: CUSTOM DECODE\n'
|
||||||
|
f'input type: {type}\n'
|
||||||
|
f'obj: {obj}\n'
|
||||||
|
f'type(obj): `{type(obj).__class__}`\n'
|
||||||
|
)
|
||||||
|
nsp = None
|
||||||
|
|
||||||
|
# This never seems to hit?
|
||||||
|
if isinstance(obj, Msg):
|
||||||
|
print(f'Msg type: {obj}')
|
||||||
|
|
||||||
|
if (
|
||||||
|
type is NamespacePath
|
||||||
|
and isinstance(obj, str)
|
||||||
|
and ':' in obj
|
||||||
|
):
|
||||||
|
nsp = NamespacePath(obj)
|
||||||
|
|
||||||
|
if nsp:
|
||||||
|
print(f'Returning NSP instance: {nsp}')
|
||||||
|
return nsp
|
||||||
|
|
||||||
|
logmsg: str = (
|
||||||
|
f'{uid}: Decoding `{obj}: <{type(obj)}>` not supported'
|
||||||
|
f'input type: {type(obj)}\n'
|
||||||
|
f'obj: {obj}\n'
|
||||||
|
f'type(obj): `{type(obj).__class__}`\n'
|
||||||
|
)
|
||||||
|
print(logmsg)
|
||||||
|
raise NotImplementedError(logmsg)
|
||||||
|
|
||||||
|
|
||||||
|
nsp_codec: MsgCodec = mk_codec(
|
||||||
|
ipc_pld_spec=pld_spec,
|
||||||
|
|
||||||
|
# NOTE XXX: the encode hook MUST be used no matter what since
|
||||||
|
# our `NamespacePath` is not any of a `Any` native type nor
|
||||||
|
# a `msgspec.Struct` subtype - so `msgspec` has no way to know
|
||||||
|
# how to encode it unless we provide the custom hook.
|
||||||
|
#
|
||||||
|
# AGAIN that is, regardless of whether we spec an
|
||||||
|
# `Any`-decoded-pld the enc has no knowledge (by default)
|
||||||
|
# how to enc `NamespacePath` (nsp), so we add a custom
|
||||||
|
# hook to do that ALWAYS.
|
||||||
|
enc_hook=enc_nsp,
|
||||||
|
|
||||||
|
# XXX NOTE: pretty sure this is mutex with the `type=` to
|
||||||
|
# `Decoder`? so it won't work in tandem with the
|
||||||
|
# `ipc_pld_spec` passed above?
|
||||||
|
dec_hook=dec_nsp,
|
||||||
|
)
|
||||||
return nsp_codec
|
return nsp_codec
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def send_back_nsp(
|
async def send_back_nsp(
|
||||||
ctx: tractor.Context,
|
ctx: Context,
|
||||||
|
expect_debug: bool,
|
||||||
|
use_any_spec: bool,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -117,28 +173,65 @@ async def send_back_nsp(
|
||||||
and ensure we can round trip a func ref with our parent.
|
and ensure we can round trip a func ref with our parent.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
task: trio.Task = trio.lowlevel.current_task()
|
# debug mode sanity check
|
||||||
task_ctx: Context = task.context
|
assert expect_debug == _state.debug_mode()
|
||||||
assert _ctxvar_MsgCodec not in task_ctx
|
|
||||||
|
|
||||||
nsp_codec: MsgCodec = mk_custom_codec()
|
# task: trio.Task = trio.lowlevel.current_task()
|
||||||
|
|
||||||
|
# TreeVar
|
||||||
|
# curr_codec = _ctxvar_MsgCodec.get_in(task)
|
||||||
|
|
||||||
|
# ContextVar
|
||||||
|
# task_ctx: Context = task.context
|
||||||
|
# assert _ctxvar_MsgCodec not in task_ctx
|
||||||
|
|
||||||
|
curr_codec = _ctxvar_MsgCodec.get()
|
||||||
|
assert curr_codec is _codec._def_tractor_codec
|
||||||
|
|
||||||
|
if use_any_spec:
|
||||||
|
pld_spec = Any
|
||||||
|
else:
|
||||||
|
# NOTE: don't need the |None here since
|
||||||
|
# the parent side will never send `None` like
|
||||||
|
# we do here in the implicit return at the end of this
|
||||||
|
# `@context` body.
|
||||||
|
pld_spec = NamespacePath # |None
|
||||||
|
|
||||||
|
nsp_codec: MsgCodec = mk_custom_codec(
|
||||||
|
pld_spec=pld_spec,
|
||||||
|
)
|
||||||
with apply_codec(nsp_codec) as codec:
|
with apply_codec(nsp_codec) as codec:
|
||||||
chk_codec_applied(
|
chk_codec_applied(
|
||||||
custom_codec=nsp_codec,
|
custom_codec=nsp_codec,
|
||||||
enter_value=codec,
|
enter_value=codec,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# ensure roundtripping works locally
|
||||||
nsp = NamespacePath.from_ref(ex_func)
|
nsp = NamespacePath.from_ref(ex_func)
|
||||||
await ctx.started(nsp)
|
wire_bytes: bytes = nsp_codec.encode(
|
||||||
|
Started(
|
||||||
|
cid=ctx.cid,
|
||||||
|
pld=nsp
|
||||||
|
)
|
||||||
|
)
|
||||||
|
msg: Started = nsp_codec.decode(wire_bytes)
|
||||||
|
pld = msg.pld
|
||||||
|
assert pld == nsp
|
||||||
|
|
||||||
|
await ctx.started(nsp)
|
||||||
async with ctx.open_stream() as ipc:
|
async with ctx.open_stream() as ipc:
|
||||||
async for msg in ipc:
|
async for msg in ipc:
|
||||||
|
|
||||||
assert msg == f'{__name__}:ex_func'
|
if use_any_spec:
|
||||||
|
assert msg == f'{__name__}:ex_func'
|
||||||
|
|
||||||
# TODO: as per below
|
# TODO: as per below
|
||||||
# assert isinstance(msg, NamespacePath)
|
# assert isinstance(msg, NamespacePath)
|
||||||
assert isinstance(msg, str)
|
assert isinstance(msg, str)
|
||||||
|
else:
|
||||||
|
assert isinstance(msg, NamespacePath)
|
||||||
|
|
||||||
|
await ipc.send(msg)
|
||||||
|
|
||||||
|
|
||||||
def chk_codec_applied(
|
def chk_codec_applied(
|
||||||
|
@ -146,11 +239,20 @@ def chk_codec_applied(
|
||||||
enter_value: MsgCodec,
|
enter_value: MsgCodec,
|
||||||
) -> MsgCodec:
|
) -> MsgCodec:
|
||||||
|
|
||||||
task: trio.Task = trio.lowlevel.current_task()
|
# task: trio.Task = trio.lowlevel.current_task()
|
||||||
task_ctx: Context = task.context
|
|
||||||
|
|
||||||
assert _ctxvar_MsgCodec in task_ctx
|
# TreeVar
|
||||||
curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec]
|
# curr_codec = _ctxvar_MsgCodec.get_in(task)
|
||||||
|
|
||||||
|
# ContextVar
|
||||||
|
# task_ctx: Context = task.context
|
||||||
|
# assert _ctxvar_MsgCodec in task_ctx
|
||||||
|
# curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec]
|
||||||
|
|
||||||
|
# RunVar
|
||||||
|
curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||||
|
last_read_codec = _ctxvar_MsgCodec.get()
|
||||||
|
assert curr_codec is last_read_codec
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
# returned from `mk_codec()`
|
# returned from `mk_codec()`
|
||||||
|
@ -163,14 +265,31 @@ def chk_codec_applied(
|
||||||
curr_codec is
|
curr_codec is
|
||||||
|
|
||||||
# public API for all of the above
|
# public API for all of the above
|
||||||
current_msgspec_codec()
|
current_codec()
|
||||||
|
|
||||||
# the default `msgspec` settings
|
# the default `msgspec` settings
|
||||||
is not _def_msgspec_codec
|
is not _codec._def_msgspec_codec
|
||||||
|
is not _codec._def_tractor_codec
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_codec_hooks_mod():
|
@pytest.mark.parametrize(
|
||||||
|
'ipc_pld_spec',
|
||||||
|
[
|
||||||
|
# _codec._def_msgspec_codec,
|
||||||
|
Any,
|
||||||
|
# _codec._def_tractor_codec,
|
||||||
|
NamespacePath|None,
|
||||||
|
],
|
||||||
|
ids=[
|
||||||
|
'any_type',
|
||||||
|
'nsp_type',
|
||||||
|
]
|
||||||
|
)
|
||||||
|
def test_codec_hooks_mod(
|
||||||
|
debug_mode: bool,
|
||||||
|
ipc_pld_spec: Union[Type]|Any,
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Audit the `.msg.MsgCodec` override apis details given our impl
|
Audit the `.msg.MsgCodec` override apis details given our impl
|
||||||
uses `contextvars` to accomplish per `trio` task codec
|
uses `contextvars` to accomplish per `trio` task codec
|
||||||
|
@ -178,11 +297,21 @@ def test_codec_hooks_mod():
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
task: trio.Task = trio.lowlevel.current_task()
|
|
||||||
task_ctx: Context = task.context
|
|
||||||
assert _ctxvar_MsgCodec not in task_ctx
|
|
||||||
|
|
||||||
async with tractor.open_nursery() as an:
|
# task: trio.Task = trio.lowlevel.current_task()
|
||||||
|
|
||||||
|
# ContextVar
|
||||||
|
# task_ctx: Context = task.context
|
||||||
|
# assert _ctxvar_MsgCodec not in task_ctx
|
||||||
|
|
||||||
|
# TreeVar
|
||||||
|
# def_codec: MsgCodec = _ctxvar_MsgCodec.get_in(task)
|
||||||
|
def_codec = _ctxvar_MsgCodec.get()
|
||||||
|
assert def_codec is _codec._def_tractor_codec
|
||||||
|
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
) as an:
|
||||||
p: tractor.Portal = await an.start_actor(
|
p: tractor.Portal = await an.start_actor(
|
||||||
'sub',
|
'sub',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
|
@ -192,7 +321,9 @@ def test_codec_hooks_mod():
|
||||||
# - codec not modified -> decode nsp as `str`
|
# - codec not modified -> decode nsp as `str`
|
||||||
# - codec modified with hooks -> decode nsp as
|
# - codec modified with hooks -> decode nsp as
|
||||||
# `NamespacePath`
|
# `NamespacePath`
|
||||||
nsp_codec: MsgCodec = mk_custom_codec()
|
nsp_codec: MsgCodec = mk_custom_codec(
|
||||||
|
pld_spec=ipc_pld_spec,
|
||||||
|
)
|
||||||
with apply_codec(nsp_codec) as codec:
|
with apply_codec(nsp_codec) as codec:
|
||||||
chk_codec_applied(
|
chk_codec_applied(
|
||||||
custom_codec=nsp_codec,
|
custom_codec=nsp_codec,
|
||||||
|
@ -202,9 +333,22 @@ def test_codec_hooks_mod():
|
||||||
async with (
|
async with (
|
||||||
p.open_context(
|
p.open_context(
|
||||||
send_back_nsp,
|
send_back_nsp,
|
||||||
|
# TODO: send the original nsp here and
|
||||||
|
# test with `limit_msg_spec()` above?
|
||||||
|
expect_debug=debug_mode,
|
||||||
|
use_any_spec=(ipc_pld_spec==Any),
|
||||||
|
|
||||||
) as (ctx, first),
|
) as (ctx, first),
|
||||||
ctx.open_stream() as ipc,
|
ctx.open_stream() as ipc,
|
||||||
):
|
):
|
||||||
|
if ipc_pld_spec is NamespacePath:
|
||||||
|
assert isinstance(first, NamespacePath)
|
||||||
|
|
||||||
|
print(
|
||||||
|
'root: ENTERING CONTEXT BLOCK\n'
|
||||||
|
f'type(first): {type(first)}\n'
|
||||||
|
f'first: {first}\n'
|
||||||
|
)
|
||||||
# ensure codec is still applied across
|
# ensure codec is still applied across
|
||||||
# `tractor.Context` + its embedded nursery.
|
# `tractor.Context` + its embedded nursery.
|
||||||
chk_codec_applied(
|
chk_codec_applied(
|
||||||
|
@ -212,23 +356,46 @@ def test_codec_hooks_mod():
|
||||||
enter_value=codec,
|
enter_value=codec,
|
||||||
)
|
)
|
||||||
|
|
||||||
assert first == f'{__name__}:ex_func'
|
first_nsp = NamespacePath(first)
|
||||||
|
|
||||||
|
# ensure roundtripping works
|
||||||
|
wire_bytes: bytes = nsp_codec.encode(
|
||||||
|
Started(
|
||||||
|
cid=ctx.cid,
|
||||||
|
pld=first_nsp
|
||||||
|
)
|
||||||
|
)
|
||||||
|
msg: Started = nsp_codec.decode(wire_bytes)
|
||||||
|
pld = msg.pld
|
||||||
|
assert pld == first_nsp
|
||||||
|
|
||||||
|
# try a manual decode of the started msg+pld
|
||||||
|
|
||||||
# TODO: actually get the decoder loading
|
# TODO: actually get the decoder loading
|
||||||
# to native once we spec our SCIPP msgspec
|
# to native once we spec our SCIPP msgspec
|
||||||
# (structurred-conc-inter-proc-protocol)
|
# (structurred-conc-inter-proc-protocol)
|
||||||
# implemented as per,
|
# implemented as per,
|
||||||
# https://github.com/goodboy/tractor/issues/36
|
# https://github.com/goodboy/tractor/issues/36
|
||||||
#
|
#
|
||||||
# assert isinstance(first, NamespacePath)
|
if ipc_pld_spec is NamespacePath:
|
||||||
assert isinstance(first, str)
|
assert isinstance(first, NamespacePath)
|
||||||
|
|
||||||
|
# `Any`-payload-spec case
|
||||||
|
else:
|
||||||
|
assert isinstance(first, str)
|
||||||
|
assert first == f'{__name__}:ex_func'
|
||||||
|
|
||||||
await ipc.send(first)
|
await ipc.send(first)
|
||||||
|
|
||||||
with trio.move_on_after(1):
|
with trio.move_on_after(.6):
|
||||||
async for msg in ipc:
|
async for msg in ipc:
|
||||||
|
print(msg)
|
||||||
|
|
||||||
# TODO: as per above
|
# TODO: as per above
|
||||||
# assert isinstance(msg, NamespacePath)
|
# assert isinstance(msg, NamespacePath)
|
||||||
assert isinstance(msg, str)
|
assert isinstance(msg, str)
|
||||||
|
await ipc.send(msg)
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
await p.cancel_actor()
|
await p.cancel_actor()
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ sync-opening a ``tractor.Context`` beforehand.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from itertools import count
|
from itertools import count
|
||||||
|
import math
|
||||||
import platform
|
import platform
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
from typing import (
|
from typing import (
|
||||||
|
@ -845,7 +846,10 @@ async def keep_sending_from_callee(
|
||||||
('caller', 1, never_open_stream),
|
('caller', 1, never_open_stream),
|
||||||
('callee', 0, keep_sending_from_callee),
|
('callee', 0, keep_sending_from_callee),
|
||||||
],
|
],
|
||||||
ids='overrun_condition={}'.format,
|
ids=[
|
||||||
|
('caller_1buf_never_open_stream'),
|
||||||
|
('callee_0buf_keep_sending_from_callee'),
|
||||||
|
]
|
||||||
)
|
)
|
||||||
def test_one_end_stream_not_opened(
|
def test_one_end_stream_not_opened(
|
||||||
overrun_by: tuple[str, int, Callable],
|
overrun_by: tuple[str, int, Callable],
|
||||||
|
@ -869,29 +873,30 @@ def test_one_end_stream_not_opened(
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
||||||
async with portal.open_context(
|
with trio.fail_after(0.8):
|
||||||
entrypoint,
|
async with portal.open_context(
|
||||||
) as (ctx, sent):
|
entrypoint,
|
||||||
assert sent is None
|
) as (ctx, sent):
|
||||||
|
assert sent is None
|
||||||
|
|
||||||
if 'caller' in overrunner:
|
if 'caller' in overrunner:
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
# itersend +1 msg more then the buffer size
|
# itersend +1 msg more then the buffer size
|
||||||
# to cause the most basic overrun.
|
# to cause the most basic overrun.
|
||||||
for i in range(buf_size):
|
for i in range(buf_size):
|
||||||
print(f'sending {i}')
|
print(f'sending {i}')
|
||||||
await stream.send(i)
|
await stream.send(i)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# expect overrun error to be relayed back
|
# expect overrun error to be relayed back
|
||||||
# and this sleep interrupted
|
# and this sleep interrupted
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# callee overruns caller case so we do nothing here
|
# callee overruns caller case so we do nothing here
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
@ -1055,54 +1060,63 @@ def test_maybe_allow_overruns_stream(
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
)
|
)
|
||||||
seq = list(range(10))
|
|
||||||
async with portal.open_context(
|
|
||||||
echo_back_sequence,
|
|
||||||
seq=seq,
|
|
||||||
wait_for_cancel=cancel_ctx,
|
|
||||||
be_slow=(slow_side == 'child'),
|
|
||||||
allow_overruns_side=allow_overruns_side,
|
|
||||||
|
|
||||||
) as (ctx, sent):
|
# stream-sequence batch info with send delay to determine
|
||||||
assert sent is None
|
# approx timeout determining whether test has hung.
|
||||||
|
total_batches: int = 2
|
||||||
|
num_items: int = 10
|
||||||
|
seq = list(range(num_items))
|
||||||
|
parent_send_delay: float = 0.16
|
||||||
|
timeout: float = math.ceil(
|
||||||
|
total_batches * num_items * parent_send_delay
|
||||||
|
)
|
||||||
|
with trio.fail_after(timeout):
|
||||||
|
async with portal.open_context(
|
||||||
|
echo_back_sequence,
|
||||||
|
seq=seq,
|
||||||
|
wait_for_cancel=cancel_ctx,
|
||||||
|
be_slow=(slow_side == 'child'),
|
||||||
|
allow_overruns_side=allow_overruns_side,
|
||||||
|
|
||||||
async with ctx.open_stream(
|
) as (ctx, sent):
|
||||||
msg_buffer_size=1 if slow_side == 'parent' else None,
|
assert sent is None
|
||||||
allow_overruns=(allow_overruns_side in {'parent', 'both'}),
|
|
||||||
) as stream:
|
|
||||||
|
|
||||||
total_batches: int = 2
|
async with ctx.open_stream(
|
||||||
for _ in range(total_batches):
|
msg_buffer_size=1 if slow_side == 'parent' else None,
|
||||||
for msg in seq:
|
allow_overruns=(allow_overruns_side in {'parent', 'both'}),
|
||||||
# print(f'root tx {msg}')
|
) as stream:
|
||||||
await stream.send(msg)
|
|
||||||
if slow_side == 'parent':
|
|
||||||
# NOTE: we make the parent slightly
|
|
||||||
# slower, when it is slow, to make sure
|
|
||||||
# that in the overruns everywhere case
|
|
||||||
await trio.sleep(0.16)
|
|
||||||
|
|
||||||
batch = []
|
for _ in range(total_batches):
|
||||||
async for msg in stream:
|
for msg in seq:
|
||||||
print(f'root rx {msg}')
|
# print(f'root tx {msg}')
|
||||||
batch.append(msg)
|
await stream.send(msg)
|
||||||
if batch == seq:
|
if slow_side == 'parent':
|
||||||
break
|
# NOTE: we make the parent slightly
|
||||||
|
# slower, when it is slow, to make sure
|
||||||
|
# that in the overruns everywhere case
|
||||||
|
await trio.sleep(parent_send_delay)
|
||||||
|
|
||||||
|
batch = []
|
||||||
|
async for msg in stream:
|
||||||
|
print(f'root rx {msg}')
|
||||||
|
batch.append(msg)
|
||||||
|
if batch == seq:
|
||||||
|
break
|
||||||
|
|
||||||
|
if cancel_ctx:
|
||||||
|
# cancel the remote task
|
||||||
|
print('Requesting `ctx.cancel()` in parent!')
|
||||||
|
await ctx.cancel()
|
||||||
|
|
||||||
|
res: str|ContextCancelled = await ctx.result()
|
||||||
|
|
||||||
if cancel_ctx:
|
if cancel_ctx:
|
||||||
# cancel the remote task
|
assert isinstance(res, ContextCancelled)
|
||||||
print('Requesting `ctx.cancel()` in parent!')
|
assert tuple(res.canceller) == current_actor().uid
|
||||||
await ctx.cancel()
|
|
||||||
|
|
||||||
res: str|ContextCancelled = await ctx.result()
|
else:
|
||||||
|
print(f'RX ROOT SIDE RESULT {res}')
|
||||||
if cancel_ctx:
|
assert res == 'yo'
|
||||||
assert isinstance(res, ContextCancelled)
|
|
||||||
assert tuple(res.canceller) == current_actor().uid
|
|
||||||
|
|
||||||
else:
|
|
||||||
print(f'RX ROOT SIDE RESULT {res}')
|
|
||||||
assert res == 'yo'
|
|
||||||
|
|
||||||
# cancel the daemon
|
# cancel the daemon
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
|
@ -31,25 +31,24 @@ from ._codec import (
|
||||||
apply_codec as apply_codec,
|
apply_codec as apply_codec,
|
||||||
mk_codec as mk_codec,
|
mk_codec as mk_codec,
|
||||||
MsgCodec as MsgCodec,
|
MsgCodec as MsgCodec,
|
||||||
current_msgspec_codec as current_msgspec_codec,
|
current_codec as current_codec,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .types import (
|
from .types import (
|
||||||
Msg as Msg,
|
Msg as Msg,
|
||||||
|
|
||||||
Start as Start, # with pld
|
Aid as Aid,
|
||||||
FuncSpec as FuncSpec,
|
SpawnSpec as SpawnSpec,
|
||||||
|
|
||||||
StartAck as StartAck, # with pld
|
Start as Start,
|
||||||
IpcCtxSpec as IpcCtxSpec,
|
StartAck as StartAck,
|
||||||
|
|
||||||
Started as Started,
|
Started as Started,
|
||||||
Yield as Yield,
|
Yield as Yield,
|
||||||
Stop as Stop,
|
Stop as Stop,
|
||||||
Return as Return,
|
Return as Return,
|
||||||
|
|
||||||
Error as Error, # with pld
|
Error as Error,
|
||||||
ErrorData as ErrorData,
|
|
||||||
|
|
||||||
# full msg spec set
|
# full msg spec set
|
||||||
__spec__ as __spec__,
|
__spec__ as __spec__,
|
||||||
|
|
|
@ -30,13 +30,13 @@ ToDo: backends we prolly should offer:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from contextvars import (
|
|
||||||
ContextVar,
|
|
||||||
Token,
|
|
||||||
)
|
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
contextmanager as cm,
|
contextmanager as cm,
|
||||||
)
|
)
|
||||||
|
# from contextvars import (
|
||||||
|
# ContextVar,
|
||||||
|
# Token,
|
||||||
|
# )
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
|
@ -47,6 +47,12 @@ from types import ModuleType
|
||||||
|
|
||||||
import msgspec
|
import msgspec
|
||||||
from msgspec import msgpack
|
from msgspec import msgpack
|
||||||
|
from trio.lowlevel import (
|
||||||
|
RunVar,
|
||||||
|
RunVarToken,
|
||||||
|
)
|
||||||
|
# TODO: see notes below from @mikenerone..
|
||||||
|
# from tricycle import TreeVar
|
||||||
|
|
||||||
from tractor.msg.pretty_struct import Struct
|
from tractor.msg.pretty_struct import Struct
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
|
@ -72,6 +78,9 @@ class MsgCodec(Struct):
|
||||||
'''
|
'''
|
||||||
A IPC msg interchange format lib's encoder + decoder pair.
|
A IPC msg interchange format lib's encoder + decoder pair.
|
||||||
|
|
||||||
|
Pretty much nothing more then delegation to underlying
|
||||||
|
`msgspec.<interchange-protocol>.Encoder/Decoder`s for now.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
_enc: msgpack.Encoder
|
_enc: msgpack.Encoder
|
||||||
_dec: msgpack.Decoder
|
_dec: msgpack.Decoder
|
||||||
|
@ -86,11 +95,6 @@ class MsgCodec(Struct):
|
||||||
|
|
||||||
lib: ModuleType = msgspec
|
lib: ModuleType = msgspec
|
||||||
|
|
||||||
# ad-hoc type extensions
|
|
||||||
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
|
||||||
enc_hook: Callable[[Any], Any]|None = None # coder
|
|
||||||
dec_hook: Callable[[type, Any], Any]|None = None # decoder
|
|
||||||
|
|
||||||
# TODO: a sub-decoder system as well?
|
# TODO: a sub-decoder system as well?
|
||||||
# payload_msg_specs: Union[Type[Struct]] = Any
|
# payload_msg_specs: Union[Type[Struct]] = Any
|
||||||
# see related comments in `.msg.types`
|
# see related comments in `.msg.types`
|
||||||
|
@ -304,7 +308,8 @@ def mk_codec(
|
||||||
|
|
||||||
libname: str = 'msgspec',
|
libname: str = 'msgspec',
|
||||||
|
|
||||||
# proxy as `Struct(**kwargs)`
|
# proxy as `Struct(**kwargs)` for ad-hoc type extensions
|
||||||
|
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
||||||
# ------ - ------
|
# ------ - ------
|
||||||
dec_hook: Callable|None = None,
|
dec_hook: Callable|None = None,
|
||||||
enc_hook: Callable|None = None,
|
enc_hook: Callable|None = None,
|
||||||
|
@ -389,14 +394,52 @@ def mk_codec(
|
||||||
# no custom structs, hooks or other special types.
|
# no custom structs, hooks or other special types.
|
||||||
_def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any)
|
_def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any)
|
||||||
|
|
||||||
# NOTE: provides for per-`trio.Task` specificity of the
|
# The built-in IPC `Msg` spec.
|
||||||
|
# Our composing "shuttle" protocol which allows `tractor`-app code
|
||||||
|
# to use any `msgspec` supported type as the `Msg.pld` payload,
|
||||||
|
# https://jcristharif.com/msgspec/supported-types.html
|
||||||
|
#
|
||||||
|
_def_tractor_codec: MsgCodec = mk_codec(
|
||||||
|
ipc_pld_spec=Any,
|
||||||
|
)
|
||||||
|
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
|
||||||
# IPC msging codec used by the transport layer when doing
|
# IPC msging codec used by the transport layer when doing
|
||||||
# `Channel.send()/.recv()` of wire data.
|
# `Channel.send()/.recv()` of wire data.
|
||||||
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
|
|
||||||
|
# ContextVar-TODO: DIDN'T WORK, kept resetting in every new task to default!?
|
||||||
|
# _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
|
||||||
|
|
||||||
|
# TreeVar-TODO: DIDN'T WORK, kept resetting in every new embedded nursery
|
||||||
|
# even though it's supposed to inherit from a parent context ???
|
||||||
|
#
|
||||||
|
# _ctxvar_MsgCodec: TreeVar[MsgCodec] = TreeVar(
|
||||||
|
#
|
||||||
|
# ^-NOTE-^: for this to work see the mods by @mikenerone from `trio` gitter:
|
||||||
|
#
|
||||||
|
# 22:02:54 <mikenerone> even for regular contextvars, all you have to do is:
|
||||||
|
# `task: Task = trio.lowlevel.current_task()`
|
||||||
|
# `task.parent_nursery.parent_task.context.run(my_ctx_var.set, new_value)`
|
||||||
|
#
|
||||||
|
# From a comment in his prop code he couldn't share outright:
|
||||||
|
# 1. For every TreeVar set in the current task (which covers what
|
||||||
|
# we need from SynchronizerFacade), walk up the tree until the
|
||||||
|
# root or finding one where the TreeVar is already set, setting
|
||||||
|
# it in all of the contexts along the way.
|
||||||
|
# 2. For each of those, we also forcibly set the values that are
|
||||||
|
# pending for child nurseries that have not yet accessed the
|
||||||
|
# TreeVar.
|
||||||
|
# 3. We similarly set the pending values for the child nurseries
|
||||||
|
# of the *current* task.
|
||||||
|
#
|
||||||
|
|
||||||
|
# TODO: STOP USING THIS, since it's basically a global and won't
|
||||||
|
# allow sub-IPC-ctxs to limit the msg-spec however desired..
|
||||||
|
_ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||||
'msgspec_codec',
|
'msgspec_codec',
|
||||||
|
|
||||||
# TODO: move this to our new `Msg`-spec!
|
# TODO: move this to our new `Msg`-spec!
|
||||||
default=_def_msgspec_codec,
|
default=_def_msgspec_codec,
|
||||||
|
# default=_def_tractor_codec,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -410,15 +453,36 @@ def apply_codec(
|
||||||
runtime context such that all IPC msgs are processed
|
runtime context such that all IPC msgs are processed
|
||||||
with it for that task.
|
with it for that task.
|
||||||
|
|
||||||
|
Uses a `tricycle.TreeVar` to ensure the scope of the codec
|
||||||
|
matches the `@cm` block and DOES NOT change to the original
|
||||||
|
(default) value in new tasks (as it does for `ContextVar`).
|
||||||
|
|
||||||
|
See the docs:
|
||||||
|
- https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
||||||
|
- https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
|
||||||
|
|
||||||
'''
|
'''
|
||||||
token: Token = _ctxvar_MsgCodec.set(codec)
|
orig: MsgCodec = _ctxvar_MsgCodec.get()
|
||||||
|
assert orig is not codec
|
||||||
|
token: RunVarToken = _ctxvar_MsgCodec.set(codec)
|
||||||
|
|
||||||
|
# TODO: for TreeVar approach, see docs for @cm `.being()` API:
|
||||||
|
# https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
|
||||||
|
# try:
|
||||||
|
# with _ctxvar_MsgCodec.being(codec):
|
||||||
|
# new = _ctxvar_MsgCodec.get()
|
||||||
|
# assert new is codec
|
||||||
|
# yield codec
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield _ctxvar_MsgCodec.get()
|
yield _ctxvar_MsgCodec.get()
|
||||||
finally:
|
finally:
|
||||||
_ctxvar_MsgCodec.reset(token)
|
_ctxvar_MsgCodec.reset(token)
|
||||||
|
|
||||||
|
assert _ctxvar_MsgCodec.get() is orig
|
||||||
|
|
||||||
def current_msgspec_codec() -> MsgCodec:
|
|
||||||
|
def current_codec() -> MsgCodec:
|
||||||
'''
|
'''
|
||||||
Return the current `trio.Task.context`'s value
|
Return the current `trio.Task.context`'s value
|
||||||
for `msgspec_codec` used by `Channel.send/.recv()`
|
for `msgspec_codec` used by `Channel.send/.recv()`
|
||||||
|
@ -449,5 +513,6 @@ def limit_msg_spec(
|
||||||
payload_types=payload_types,
|
payload_types=payload_types,
|
||||||
**codec_kwargs,
|
**codec_kwargs,
|
||||||
)
|
)
|
||||||
with apply_codec(msgspec_codec):
|
with apply_codec(msgspec_codec) as applied_codec:
|
||||||
|
assert applied_codec is msgspec_codec
|
||||||
yield msgspec_codec
|
yield msgspec_codec
|
||||||
|
|
|
@ -80,6 +80,28 @@ class DiffDump(UserList):
|
||||||
return repstr
|
return repstr
|
||||||
|
|
||||||
|
|
||||||
|
def iter_fields(struct: Struct) -> Iterator[
|
||||||
|
tuple[
|
||||||
|
structs.FieldIinfo,
|
||||||
|
str,
|
||||||
|
Any,
|
||||||
|
]
|
||||||
|
]:
|
||||||
|
'''
|
||||||
|
Iterate over all non-@property fields of this struct.
|
||||||
|
|
||||||
|
'''
|
||||||
|
fi: structs.FieldInfo
|
||||||
|
for fi in structs.fields(struct):
|
||||||
|
key: str = fi.name
|
||||||
|
val: Any = getattr(struct, key)
|
||||||
|
yield (
|
||||||
|
fi,
|
||||||
|
key,
|
||||||
|
val,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Struct(
|
class Struct(
|
||||||
_Struct,
|
_Struct,
|
||||||
|
|
||||||
|
@ -91,23 +113,6 @@ class Struct(
|
||||||
A "human friendlier" (aka repl buddy) struct subtype.
|
A "human friendlier" (aka repl buddy) struct subtype.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
def _sin_props(self) -> Iterator[
|
|
||||||
tuple[
|
|
||||||
structs.FieldIinfo,
|
|
||||||
str,
|
|
||||||
Any,
|
|
||||||
]
|
|
||||||
]:
|
|
||||||
'''
|
|
||||||
Iterate over all non-@property fields of this struct.
|
|
||||||
|
|
||||||
'''
|
|
||||||
fi: structs.FieldInfo
|
|
||||||
for fi in structs.fields(self):
|
|
||||||
key: str = fi.name
|
|
||||||
val: Any = getattr(self, key)
|
|
||||||
yield fi, key, val
|
|
||||||
|
|
||||||
def to_dict(
|
def to_dict(
|
||||||
self,
|
self,
|
||||||
include_non_members: bool = True,
|
include_non_members: bool = True,
|
||||||
|
@ -130,7 +135,7 @@ class Struct(
|
||||||
# added as type-defined `@property` methods!
|
# added as type-defined `@property` methods!
|
||||||
sin_props: dict = {}
|
sin_props: dict = {}
|
||||||
fi: structs.FieldInfo
|
fi: structs.FieldInfo
|
||||||
for fi, k, v in self._sin_props():
|
for fi, k, v in iter_fields(self):
|
||||||
sin_props[k] = asdict[k]
|
sin_props[k] = asdict[k]
|
||||||
|
|
||||||
return sin_props
|
return sin_props
|
||||||
|
@ -159,7 +164,7 @@ class Struct(
|
||||||
fi: structs.FieldInfo
|
fi: structs.FieldInfo
|
||||||
k: str
|
k: str
|
||||||
v: Any
|
v: Any
|
||||||
for fi, k, v in self._sin_props():
|
for fi, k, v in iter_fields(self):
|
||||||
|
|
||||||
# TODO: how can we prefer `Literal['option1', 'option2,
|
# TODO: how can we prefer `Literal['option1', 'option2,
|
||||||
# ..]` over .__name__ == `Literal` but still get only the
|
# ..]` over .__name__ == `Literal` but still get only the
|
||||||
|
|
|
@ -26,6 +26,7 @@ from __future__ import annotations
|
||||||
import types
|
import types
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
|
Callable,
|
||||||
Generic,
|
Generic,
|
||||||
Literal,
|
Literal,
|
||||||
Type,
|
Type,
|
||||||
|
@ -37,8 +38,12 @@ from msgspec import (
|
||||||
defstruct,
|
defstruct,
|
||||||
# field,
|
# field,
|
||||||
Struct,
|
Struct,
|
||||||
UNSET,
|
# UNSET,
|
||||||
UnsetType,
|
# UnsetType,
|
||||||
|
)
|
||||||
|
|
||||||
|
from tractor.msg import (
|
||||||
|
pretty_struct,
|
||||||
)
|
)
|
||||||
|
|
||||||
# type variable for the boxed payload field `.pld`
|
# type variable for the boxed payload field `.pld`
|
||||||
|
@ -48,11 +53,19 @@ PayloadT = TypeVar('PayloadT')
|
||||||
class Msg(
|
class Msg(
|
||||||
Struct,
|
Struct,
|
||||||
Generic[PayloadT],
|
Generic[PayloadT],
|
||||||
|
|
||||||
|
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||||
tag=True,
|
tag=True,
|
||||||
tag_field='msg_type',
|
tag_field='msg_type',
|
||||||
|
|
||||||
# eq=True,
|
# https://jcristharif.com/msgspec/structs.html#field-ordering
|
||||||
|
# kw_only=True,
|
||||||
|
|
||||||
|
# https://jcristharif.com/msgspec/structs.html#equality-and-order
|
||||||
# order=True,
|
# order=True,
|
||||||
|
|
||||||
|
# https://jcristharif.com/msgspec/structs.html#encoding-decoding-as-arrays
|
||||||
|
# as_array=True,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
The "god" boxing msg type.
|
The "god" boxing msg type.
|
||||||
|
@ -90,6 +103,53 @@ class Msg(
|
||||||
pld: PayloadT
|
pld: PayloadT
|
||||||
|
|
||||||
|
|
||||||
|
class Aid(
|
||||||
|
Struct,
|
||||||
|
tag=True,
|
||||||
|
tag_field='msg_type',
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Actor-identity msg.
|
||||||
|
|
||||||
|
Initial contact exchange enabling an actor "mailbox handshake"
|
||||||
|
delivering the peer identity (and maybe eventually contact)
|
||||||
|
info.
|
||||||
|
|
||||||
|
Used by discovery protocol to register actors as well as
|
||||||
|
conduct the initial comms (capability) filtering.
|
||||||
|
|
||||||
|
'''
|
||||||
|
name: str
|
||||||
|
uuid: str
|
||||||
|
# TODO: use built-in support for UUIDs?
|
||||||
|
# -[ ] `uuid.UUID` which has multi-protocol support
|
||||||
|
# https://jcristharif.com/msgspec/supported-types.html#uuid
|
||||||
|
|
||||||
|
|
||||||
|
class SpawnSpec(
|
||||||
|
pretty_struct.Struct,
|
||||||
|
tag=True,
|
||||||
|
tag_field='msg_type',
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Initial runtime spec handed down from a spawning parent to its
|
||||||
|
child subactor immediately following first contact via an
|
||||||
|
`Aid` msg.
|
||||||
|
|
||||||
|
'''
|
||||||
|
_parent_main_data: dict
|
||||||
|
_runtime_vars: dict[str, Any]
|
||||||
|
|
||||||
|
# module import capability
|
||||||
|
enable_modules: dict[str, str]
|
||||||
|
|
||||||
|
# TODO: not just sockaddr pairs?
|
||||||
|
# -[ ] abstract into a `TransportAddr` type?
|
||||||
|
reg_addrs: list[tuple[str, int]]
|
||||||
|
bind_addrs: list[tuple[str, int]]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: caps based RPC support in the payload?
|
# TODO: caps based RPC support in the payload?
|
||||||
#
|
#
|
||||||
# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
|
# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
|
||||||
|
@ -105,18 +165,31 @@ class Msg(
|
||||||
#
|
#
|
||||||
# -[ ] can we combine .ns + .func into a native `NamespacePath` field?
|
# -[ ] can we combine .ns + .func into a native `NamespacePath` field?
|
||||||
#
|
#
|
||||||
# -[ ]better name, like `Call/TaskInput`?
|
# -[ ] better name, like `Call/TaskInput`?
|
||||||
#
|
#
|
||||||
class FuncSpec(Struct):
|
# -[ ] XXX a debugger lock msg transaction with payloads like,
|
||||||
ns: str
|
# child -> `.pld: DebugLock` -> root
|
||||||
func: str
|
# child <- `.pld: DebugLocked` <- root
|
||||||
|
# child -> `.pld: DebugRelease` -> root
|
||||||
kwargs: dict
|
#
|
||||||
uid: str # (calling) actor-id
|
# WHY => when a pld spec is provided it might not allow for
|
||||||
|
# debug mode msgs as they currently are (using plain old `pld.
|
||||||
|
# str` payloads) so we only when debug_mode=True we need to
|
||||||
|
# union in this debugger payload set?
|
||||||
|
#
|
||||||
|
# mk_msg_spec(
|
||||||
|
# MyPldSpec,
|
||||||
|
# debug_mode=True,
|
||||||
|
# ) -> (
|
||||||
|
# Union[MyPldSpec]
|
||||||
|
# | Union[DebugLock, DebugLocked, DebugRelease]
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
class Start(
|
class Start(
|
||||||
Msg,
|
Struct,
|
||||||
|
tag=True,
|
||||||
|
tag_field='msg_type',
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Initial request to remotely schedule an RPC `trio.Task` via
|
Initial request to remotely schedule an RPC `trio.Task` via
|
||||||
|
@ -134,14 +207,26 @@ class Start(
|
||||||
- `Context.open_context()`
|
- `Context.open_context()`
|
||||||
|
|
||||||
'''
|
'''
|
||||||
pld: FuncSpec
|
cid: str
|
||||||
|
|
||||||
|
ns: str
|
||||||
|
func: str
|
||||||
|
|
||||||
|
kwargs: dict
|
||||||
|
uid: tuple[str, str] # (calling) actor-id
|
||||||
|
|
||||||
|
|
||||||
class IpcCtxSpec(Struct):
|
class StartAck(
|
||||||
|
Struct,
|
||||||
|
tag=True,
|
||||||
|
tag_field='msg_type',
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
An inter-actor-`trio.Task`-comms `Context` spec.
|
Init response to a `Cmd` request indicating the far
|
||||||
|
end's RPC spec, namely its callable "type".
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
cid: str
|
||||||
# TODO: maybe better names for all these?
|
# TODO: maybe better names for all these?
|
||||||
# -[ ] obvi ^ would need sync with `._rpc`
|
# -[ ] obvi ^ would need sync with `._rpc`
|
||||||
functype: Literal[
|
functype: Literal[
|
||||||
|
@ -160,18 +245,6 @@ class IpcCtxSpec(Struct):
|
||||||
# msgspec: MsgSpec
|
# msgspec: MsgSpec
|
||||||
|
|
||||||
|
|
||||||
class StartAck(
|
|
||||||
Msg,
|
|
||||||
Generic[PayloadT],
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
Init response to a `Cmd` request indicating the far
|
|
||||||
end's RPC callable "type".
|
|
||||||
|
|
||||||
'''
|
|
||||||
pld: IpcCtxSpec
|
|
||||||
|
|
||||||
|
|
||||||
class Started(
|
class Started(
|
||||||
Msg,
|
Msg,
|
||||||
Generic[PayloadT],
|
Generic[PayloadT],
|
||||||
|
@ -202,13 +275,19 @@ class Yield(
|
||||||
pld: PayloadT
|
pld: PayloadT
|
||||||
|
|
||||||
|
|
||||||
class Stop(Msg):
|
class Stop(
|
||||||
|
Struct,
|
||||||
|
tag=True,
|
||||||
|
tag_field='msg_type',
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Stream termination signal much like an IPC version
|
Stream termination signal much like an IPC version
|
||||||
of `StopAsyncIteration`.
|
of `StopAsyncIteration`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
pld: UnsetType = UNSET
|
cid: str
|
||||||
|
# TODO: do we want to support a payload on stop?
|
||||||
|
# pld: UnsetType = UNSET
|
||||||
|
|
||||||
|
|
||||||
class Return(
|
class Return(
|
||||||
|
@ -223,32 +302,33 @@ class Return(
|
||||||
pld: PayloadT
|
pld: PayloadT
|
||||||
|
|
||||||
|
|
||||||
class ErrorData(Struct):
|
class Error(
|
||||||
|
Struct,
|
||||||
|
tag=True,
|
||||||
|
tag_field='msg_type',
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Remote actor error meta-data as needed originally by
|
A pkt that wraps `RemoteActorError`s for relay and raising.
|
||||||
|
|
||||||
|
Fields are 1-to-1 meta-data as needed originally by
|
||||||
`RemoteActorError.msgdata: dict`.
|
`RemoteActorError.msgdata: dict`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
src_uid: str
|
src_uid: tuple[str, str]
|
||||||
src_type_str: str
|
src_type_str: str
|
||||||
boxed_type_str: str
|
boxed_type_str: str
|
||||||
|
relay_path: list[tuple[str, str]]
|
||||||
relay_path: list[str]
|
|
||||||
tb_str: str
|
tb_str: str
|
||||||
|
|
||||||
|
cid: str|None = None
|
||||||
|
|
||||||
|
# TODO: use UNSET or don't include them via
|
||||||
|
#
|
||||||
# `ContextCancelled`
|
# `ContextCancelled`
|
||||||
canceller: str|None = None
|
canceller: tuple[str, str]|None = None
|
||||||
|
|
||||||
# `StreamOverrun`
|
# `StreamOverrun`
|
||||||
sender: str|None = None
|
sender: tuple[str, str]|None = None
|
||||||
|
|
||||||
|
|
||||||
class Error(Msg):
|
|
||||||
'''
|
|
||||||
A pkt that wraps `RemoteActorError`s for relay.
|
|
||||||
|
|
||||||
'''
|
|
||||||
pld: ErrorData
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: should be make a msg version of `ContextCancelled?`
|
# TODO: should be make a msg version of `ContextCancelled?`
|
||||||
|
@ -265,6 +345,12 @@ class Error(Msg):
|
||||||
# approx order of the IPC txn-state spaces.
|
# approx order of the IPC txn-state spaces.
|
||||||
__spec__: list[Msg] = [
|
__spec__: list[Msg] = [
|
||||||
|
|
||||||
|
# identity handshake
|
||||||
|
Aid,
|
||||||
|
|
||||||
|
# spawn specification from parent
|
||||||
|
SpawnSpec,
|
||||||
|
|
||||||
# inter-actor RPC initiation
|
# inter-actor RPC initiation
|
||||||
Start,
|
Start,
|
||||||
StartAck,
|
StartAck,
|
||||||
|
@ -280,6 +366,8 @@ __spec__: list[Msg] = [
|
||||||
]
|
]
|
||||||
|
|
||||||
_runtime_spec_msgs: list[Msg] = [
|
_runtime_spec_msgs: list[Msg] = [
|
||||||
|
Aid,
|
||||||
|
SpawnSpec,
|
||||||
Start,
|
Start,
|
||||||
StartAck,
|
StartAck,
|
||||||
Stop,
|
Stop,
|
||||||
|
@ -443,3 +531,99 @@ def mk_msg_spec(
|
||||||
pld_spec | runtime_spec,
|
pld_spec | runtime_spec,
|
||||||
msgtypes_table[spec_build_method] + ipc_msg_types,
|
msgtypes_table[spec_build_method] + ipc_msg_types,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: make something similar to this inside `._codec` such that
|
||||||
|
# user can just pass a type table of some sort?
|
||||||
|
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
|
||||||
|
# '''
|
||||||
|
# Deliver a `enc_hook()`/`dec_hook()` pair which does
|
||||||
|
# manual convertion from our above native `Msg` set
|
||||||
|
# to `dict` equivalent (wire msgs) in order to keep legacy compat
|
||||||
|
# with the original runtime implementation.
|
||||||
|
|
||||||
|
# Note: this is is/was primarly used while moving the core
|
||||||
|
# runtime over to using native `Msg`-struct types wherein we
|
||||||
|
# start with the send side emitting without loading
|
||||||
|
# a typed-decoder and then later flipping the switch over to
|
||||||
|
# load to the native struct types once all runtime usage has
|
||||||
|
# been adjusted appropriately.
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# def enc_to_dict(msg: Any) -> Any:
|
||||||
|
# '''
|
||||||
|
# Encode `Msg`-structs to `dict` msgs instead
|
||||||
|
# of using `msgspec.msgpack.Decoder.type`-ed
|
||||||
|
# features.
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# match msg:
|
||||||
|
# case Start():
|
||||||
|
# dctmsg: dict = pretty_struct.Struct.to_dict(
|
||||||
|
# msg
|
||||||
|
# )['pld']
|
||||||
|
|
||||||
|
# case Error():
|
||||||
|
# dctmsg: dict = pretty_struct.Struct.to_dict(
|
||||||
|
# msg
|
||||||
|
# )['pld']
|
||||||
|
# return {'error': dctmsg}
|
||||||
|
|
||||||
|
|
||||||
|
# def dec_from_dict(
|
||||||
|
# type: Type,
|
||||||
|
# obj: Any,
|
||||||
|
# ) -> Any:
|
||||||
|
# '''
|
||||||
|
# Decode to `Msg`-structs from `dict` msgs instead
|
||||||
|
# of using `msgspec.msgpack.Decoder.type`-ed
|
||||||
|
# features.
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# cid: str = obj.get('cid')
|
||||||
|
# match obj:
|
||||||
|
# case {'cmd': pld}:
|
||||||
|
# return Start(
|
||||||
|
# cid=cid,
|
||||||
|
# pld=pld,
|
||||||
|
# )
|
||||||
|
# case {'functype': pld}:
|
||||||
|
# return StartAck(
|
||||||
|
# cid=cid,
|
||||||
|
# functype=pld,
|
||||||
|
# # pld=IpcCtxSpec(
|
||||||
|
# # functype=pld,
|
||||||
|
# # ),
|
||||||
|
# )
|
||||||
|
# case {'started': pld}:
|
||||||
|
# return Started(
|
||||||
|
# cid=cid,
|
||||||
|
# pld=pld,
|
||||||
|
# )
|
||||||
|
# case {'yield': pld}:
|
||||||
|
# return Yield(
|
||||||
|
# cid=obj['cid'],
|
||||||
|
# pld=pld,
|
||||||
|
# )
|
||||||
|
# case {'stop': pld}:
|
||||||
|
# return Stop(
|
||||||
|
# cid=cid,
|
||||||
|
# )
|
||||||
|
# case {'return': pld}:
|
||||||
|
# return Return(
|
||||||
|
# cid=cid,
|
||||||
|
# pld=pld,
|
||||||
|
# )
|
||||||
|
|
||||||
|
# case {'error': pld}:
|
||||||
|
# return Error(
|
||||||
|
# cid=cid,
|
||||||
|
# pld=ErrorData(
|
||||||
|
# **pld
|
||||||
|
# ),
|
||||||
|
# )
|
||||||
|
|
||||||
|
# return (
|
||||||
|
# # enc_to_dict,
|
||||||
|
# dec_from_dict,
|
||||||
|
# )
|
||||||
|
|
Loading…
Reference in New Issue