forked from goodboy/tractor
More correct/explicit `.started()` send-side validation
In the sense that we handle it as a special case that exposed through to `RxPld.dec_msg()` with a new `is_started_send_side: bool`. (Non-ideal) `Context.started()` impl deats: - only do send-side pld-spec validation when a new `validate_pld_spec` is set (by default it's not). - call `self.pld_rx.dec_msg(is_started_send_side=True)` to validate the payload field from the just codec-ed `Started` msg's `msg_bytes` by passing the `roundtripped` msg (with it's `.pld: Raw`) directly. - add a `hide_tb: bool` param and proxy it to the `.dec_msg()` call. (Non-ideal) `PldRx.dec_msg()` impl deats: - for now we're packing the MTE inside an `Error` via a manual call to `pack_error()` and then setting that as the `msg` passed to `_raise_from_unexpected_msg()` (though really we should just raise inline?). - manually set the `MsgTypeError._ipc_msg` to the above.. Other, - more comprehensive `Context` type doc string. - various `hide_tb: bool` kwarg additions through `._ops.PldRx` meths. - proto a `.msg._ops.validate_payload_msg()` helper planned to get the logic from this version of `.started()`'s send-side validation so as to be useful more generally elsewhere.. (like for raising back `Return` values on the child side?). Warning: this commit may have been made out of order from required changes to `._exceptions` which will come in a follow up!runtime_to_msgspec
parent
c2cc12e14f
commit
42ba855d1b
|
@ -15,12 +15,22 @@
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
The fundamental cross process SC abstraction: an inter-actor,
|
The fundamental cross-process SC abstraction: an inter-actor,
|
||||||
cancel-scope linked task "context".
|
transitively cancel-scope linked, (dual) task IPC coupled "context".
|
||||||
|
|
||||||
A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built
|
A `Context` is very similar to the look and feel of the
|
||||||
into each ``trio.Nursery`` except it links the lifetimes of memory space
|
`.cancel_scope: trio.CancelScope` built into each `trio.Nursery`
|
||||||
disjoint, parallel executing tasks in separate actors.
|
except that it links the lifetimes of 2 memory space disjoint,
|
||||||
|
parallel executing, tasks scheduled in separate "actors".
|
||||||
|
|
||||||
|
So while a `trio.Nursery` has a `.parent_task` which exists both
|
||||||
|
before (open) and then inside the body of the `async with` of the
|
||||||
|
nursery's scope (/block), a `Context` contains 2 tasks, a "parent"
|
||||||
|
and a "child" side, where both execute independently in separate
|
||||||
|
memory domains of different (host's) processes linked through
|
||||||
|
a SC-transitive IPC "shuttle dialog protocol". The underlying IPC
|
||||||
|
dialog-(un)protocol allows for the maintainance of SC properties
|
||||||
|
end-2-end between the tasks.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
@ -71,13 +81,11 @@ from .msg import (
|
||||||
MsgCodec,
|
MsgCodec,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
PayloadT,
|
PayloadT,
|
||||||
Return,
|
|
||||||
Started,
|
Started,
|
||||||
Stop,
|
Stop,
|
||||||
Yield,
|
Yield,
|
||||||
current_codec,
|
current_codec,
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
types as msgtypes,
|
|
||||||
_ops as msgops,
|
_ops as msgops,
|
||||||
)
|
)
|
||||||
from ._ipc import (
|
from ._ipc import (
|
||||||
|
@ -90,7 +98,7 @@ from ._state import (
|
||||||
debug_mode,
|
debug_mode,
|
||||||
_ctxvar_Context,
|
_ctxvar_Context,
|
||||||
)
|
)
|
||||||
|
# ------ - ------
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
|
@ -1598,16 +1606,15 @@ class Context:
|
||||||
async def started(
|
async def started(
|
||||||
self,
|
self,
|
||||||
|
|
||||||
# TODO: how to type this so that it's the
|
|
||||||
# same as the payload type? Is this enough?
|
|
||||||
value: PayloadT|None = None,
|
value: PayloadT|None = None,
|
||||||
|
validate_pld_spec: bool = True,
|
||||||
|
strict_pld_parity: bool = False,
|
||||||
|
|
||||||
strict_parity: bool = False,
|
# TODO: this will always emit for msgpack for any () vs. []
|
||||||
|
# inside the value.. do we want to offer warnings on that?
|
||||||
|
# complain_no_parity: bool = False,
|
||||||
|
|
||||||
# TODO: this will always emit now that we do `.pld: Raw`
|
hide_tb: bool = True,
|
||||||
# passthrough.. so maybe just only complain when above strict
|
|
||||||
# flag is set?
|
|
||||||
complain_no_parity: bool = False,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -1648,63 +1655,54 @@ class Context:
|
||||||
#
|
#
|
||||||
# https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern
|
# https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern
|
||||||
#
|
#
|
||||||
codec: MsgCodec = current_codec()
|
__tracebackhide__: bool = hide_tb
|
||||||
msg_bytes: bytes = codec.encode(started_msg)
|
if validate_pld_spec:
|
||||||
try:
|
# __tracebackhide__: bool = False
|
||||||
# be a "cheap" dialog (see above!)
|
codec: MsgCodec = current_codec()
|
||||||
if (
|
msg_bytes: bytes = codec.encode(started_msg)
|
||||||
strict_parity
|
try:
|
||||||
or
|
roundtripped: Started = codec.decode(msg_bytes)
|
||||||
complain_no_parity
|
# pld: PayloadT = await self.pld_rx.recv_pld(
|
||||||
):
|
pld: PayloadT = self.pld_rx.dec_msg(
|
||||||
rt_started: Started = codec.decode(msg_bytes)
|
msg=roundtripped,
|
||||||
|
ipc=self,
|
||||||
# XXX something is prolly totes cucked with the
|
expect_msg=Started,
|
||||||
# codec state!
|
hide_tb=hide_tb,
|
||||||
if isinstance(rt_started, dict):
|
is_started_send_side=True,
|
||||||
rt_started = msgtypes.from_dict_msg(
|
)
|
||||||
dict_msg=rt_started,
|
if (
|
||||||
)
|
strict_pld_parity
|
||||||
raise RuntimeError(
|
and
|
||||||
'Failed to roundtrip `Started` msg?\n'
|
pld != value
|
||||||
f'{pretty_struct.pformat(rt_started)}\n'
|
):
|
||||||
)
|
|
||||||
|
|
||||||
if rt_started != started_msg:
|
|
||||||
# TODO: make that one a mod func too..
|
# TODO: make that one a mod func too..
|
||||||
diff = pretty_struct.Struct.__sub__(
|
diff = pretty_struct.Struct.__sub__(
|
||||||
rt_started,
|
roundtripped,
|
||||||
started_msg,
|
started_msg,
|
||||||
)
|
)
|
||||||
complaint: str = (
|
complaint: str = (
|
||||||
'Started value does not match after roundtrip?\n\n'
|
'Started value does not match after roundtrip?\n\n'
|
||||||
f'{diff}'
|
f'{diff}'
|
||||||
)
|
)
|
||||||
|
raise ValidationError(complaint)
|
||||||
|
|
||||||
# TODO: rn this will pretty much always fail with
|
# raise any msg type error NO MATTER WHAT!
|
||||||
# any other sequence type embeded in the
|
except ValidationError as verr:
|
||||||
# payload...
|
# always show this src frame in the tb
|
||||||
if (
|
# __tracebackhide__: bool = False
|
||||||
self._strict_started
|
raise _mk_msg_type_err(
|
||||||
or
|
msg=roundtripped,
|
||||||
strict_parity
|
codec=codec,
|
||||||
):
|
src_validation_error=verr,
|
||||||
raise ValueError(complaint)
|
verb_header='Trying to send ',
|
||||||
else:
|
is_invalid_payload=True,
|
||||||
log.warning(complaint)
|
) from verr
|
||||||
|
|
||||||
await self.chan.send(started_msg)
|
# TODO: maybe a flag to by-pass encode op if already done
|
||||||
|
# here in caller?
|
||||||
# raise any msg type error NO MATTER WHAT!
|
await self.chan.send(started_msg)
|
||||||
except ValidationError as verr:
|
|
||||||
raise _mk_msg_type_err(
|
|
||||||
msg=msg_bytes,
|
|
||||||
codec=codec,
|
|
||||||
src_validation_error=verr,
|
|
||||||
verb_header='Trying to send payload'
|
|
||||||
# > 'invalid `Started IPC msgs\n'
|
|
||||||
) from verr
|
|
||||||
|
|
||||||
|
# set msg-related internal runtime-state
|
||||||
self._started_called = True
|
self._started_called = True
|
||||||
self._started_msg = started_msg
|
self._started_msg = started_msg
|
||||||
self._started_pld = value
|
self._started_pld = value
|
||||||
|
@ -1997,12 +1995,7 @@ async def open_context_from_portal(
|
||||||
|
|
||||||
pld_spec: TypeAlias|None = None,
|
pld_spec: TypeAlias|None = None,
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
|
hide_tb: bool = True,
|
||||||
# TODO: if we set this the wrapping `@acm` body will
|
|
||||||
# still be shown (awkwardly) on pdb REPL entry. Ideally
|
|
||||||
# we can similarly annotate that frame to NOT show? for now
|
|
||||||
# we DO SHOW this frame since it's awkward ow..
|
|
||||||
hide_tb: bool = False,
|
|
||||||
|
|
||||||
# proxied to RPC
|
# proxied to RPC
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
@ -2115,6 +2108,7 @@ async def open_context_from_portal(
|
||||||
ipc=ctx,
|
ipc=ctx,
|
||||||
expect_msg=Started,
|
expect_msg=Started,
|
||||||
passthrough_non_pld_msgs=False,
|
passthrough_non_pld_msgs=False,
|
||||||
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
|
|
||||||
# from .devx import pause
|
# from .devx import pause
|
||||||
|
|
|
@ -47,7 +47,7 @@ from tractor._exceptions import (
|
||||||
_raise_from_unexpected_msg,
|
_raise_from_unexpected_msg,
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
_mk_msg_type_err,
|
_mk_msg_type_err,
|
||||||
pack_from_raise,
|
pack_error,
|
||||||
)
|
)
|
||||||
from tractor._state import current_ipc_ctx
|
from tractor._state import current_ipc_ctx
|
||||||
from ._codec import (
|
from ._codec import (
|
||||||
|
@ -203,7 +203,6 @@ class PldRx(Struct):
|
||||||
msg: MsgType = (
|
msg: MsgType = (
|
||||||
ipc_msg
|
ipc_msg
|
||||||
or
|
or
|
||||||
|
|
||||||
# async-rx msg from underlying IPC feeder (mem-)chan
|
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||||
await ipc._rx_chan.receive()
|
await ipc._rx_chan.receive()
|
||||||
)
|
)
|
||||||
|
@ -223,6 +222,10 @@ class PldRx(Struct):
|
||||||
raise_error: bool = True,
|
raise_error: bool = True,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
|
||||||
|
# XXX for special (default?) case of send side call with
|
||||||
|
# `Context.started(validate_pld_spec=True)`
|
||||||
|
is_started_send_side: bool = False,
|
||||||
|
|
||||||
) -> PayloadT|Raw:
|
) -> PayloadT|Raw:
|
||||||
'''
|
'''
|
||||||
Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and
|
Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and
|
||||||
|
@ -230,8 +233,6 @@ class PldRx(Struct):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
_src_err = None
|
|
||||||
src_err: BaseException|None = None
|
src_err: BaseException|None = None
|
||||||
match msg:
|
match msg:
|
||||||
# payload-data shuttle msg; deliver the `.pld` value
|
# payload-data shuttle msg; deliver the `.pld` value
|
||||||
|
@ -256,18 +257,58 @@ class PldRx(Struct):
|
||||||
# pack mgterr into error-msg for
|
# pack mgterr into error-msg for
|
||||||
# reraise below; ensure remote-actor-err
|
# reraise below; ensure remote-actor-err
|
||||||
# info is displayed nicely?
|
# info is displayed nicely?
|
||||||
msgterr: MsgTypeError = _mk_msg_type_err(
|
mte: MsgTypeError = _mk_msg_type_err(
|
||||||
msg=msg,
|
msg=msg,
|
||||||
codec=self.pld_dec,
|
codec=self.pld_dec,
|
||||||
src_validation_error=valerr,
|
src_validation_error=valerr,
|
||||||
is_invalid_payload=True,
|
is_invalid_payload=True,
|
||||||
|
expected_msg=expect_msg,
|
||||||
|
# ipc_msg=msg,
|
||||||
)
|
)
|
||||||
msg: Error = pack_from_raise(
|
# NOTE: override the `msg` passed to
|
||||||
local_err=msgterr,
|
# `_raise_from_unexpected_msg()` (below) so so that
|
||||||
|
# we're effectively able to use that same func to
|
||||||
|
# unpack and raise an "emulated remote `Error`" of
|
||||||
|
# this local MTE.
|
||||||
|
err_msg: Error = pack_error(
|
||||||
|
exc=mte,
|
||||||
cid=msg.cid,
|
cid=msg.cid,
|
||||||
src_uid=ipc.chan.uid,
|
src_uid=(
|
||||||
|
ipc.chan.uid
|
||||||
|
if not is_started_send_side
|
||||||
|
else ipc._actor.uid
|
||||||
|
),
|
||||||
|
# tb=valerr.__traceback__,
|
||||||
|
tb_str=mte._message,
|
||||||
)
|
)
|
||||||
|
# ^-TODO-^ just raise this inline instead of all the
|
||||||
|
# pack-unpack-repack non-sense!
|
||||||
|
|
||||||
|
mte._ipc_msg = err_msg
|
||||||
|
msg = err_msg
|
||||||
|
|
||||||
|
# set emulated remote error more-or-less as the
|
||||||
|
# runtime would
|
||||||
|
ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||||
|
|
||||||
|
# TODO: should we instead make this explicit and
|
||||||
|
# use the above masked `is_started_send_decode`,
|
||||||
|
# expecting the `Context.started()` caller to set
|
||||||
|
# it? Rn this is kinda, howyousayyy, implicitly
|
||||||
|
# edge-case-y..
|
||||||
|
if (
|
||||||
|
expect_msg is not Started
|
||||||
|
and not is_started_send_side
|
||||||
|
):
|
||||||
|
ctx._maybe_cancel_and_set_remote_error(mte)
|
||||||
|
|
||||||
|
# XXX NOTE: so when the `_raise_from_unexpected_msg()`
|
||||||
|
# raises the boxed `err_msg` from above it raises
|
||||||
|
# it from `None`.
|
||||||
src_err = valerr
|
src_err = valerr
|
||||||
|
# if is_started_send_side:
|
||||||
|
# src_err = None
|
||||||
|
|
||||||
|
|
||||||
# XXX some other decoder specific failure?
|
# XXX some other decoder specific failure?
|
||||||
# except TypeError as src_error:
|
# except TypeError as src_error:
|
||||||
|
@ -379,6 +420,7 @@ class PldRx(Struct):
|
||||||
# NOTE: generally speaking only for handling `Stop`-msgs that
|
# NOTE: generally speaking only for handling `Stop`-msgs that
|
||||||
# arrive during a call to `drain_to_final_msg()` above!
|
# arrive during a call to `drain_to_final_msg()` above!
|
||||||
passthrough_non_pld_msgs: bool = True,
|
passthrough_non_pld_msgs: bool = True,
|
||||||
|
hide_tb: bool = True,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> tuple[MsgType, PayloadT]:
|
) -> tuple[MsgType, PayloadT]:
|
||||||
|
@ -387,6 +429,7 @@ class PldRx(Struct):
|
||||||
the pair of refs.
|
the pair of refs.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__tracebackhide__: bool = hide_tb
|
||||||
msg: MsgType = await ipc._rx_chan.receive()
|
msg: MsgType = await ipc._rx_chan.receive()
|
||||||
|
|
||||||
if passthrough_non_pld_msgs:
|
if passthrough_non_pld_msgs:
|
||||||
|
@ -401,6 +444,7 @@ class PldRx(Struct):
|
||||||
msg,
|
msg,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
|
hide_tb=hide_tb,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
return msg, pld
|
return msg, pld
|
||||||
|
@ -414,7 +458,7 @@ def limit_plds(
|
||||||
) -> MsgDec:
|
) -> MsgDec:
|
||||||
'''
|
'''
|
||||||
Apply a `MsgCodec` that will natively decode the SC-msg set's
|
Apply a `MsgCodec` that will natively decode the SC-msg set's
|
||||||
`Msg.pld: Union[Type[Struct]]` payload fields using
|
`PayloadMsg.pld: Union[Type[Struct]]` payload fields using
|
||||||
tagged-unions of `msgspec.Struct`s from the `payload_types`
|
tagged-unions of `msgspec.Struct`s from the `payload_types`
|
||||||
for all IPC contexts in use by the current `trio.Task`.
|
for all IPC contexts in use by the current `trio.Task`.
|
||||||
|
|
||||||
|
@ -691,3 +735,11 @@ async def drain_to_final_msg(
|
||||||
result_msg,
|
result_msg,
|
||||||
pre_result_drained,
|
pre_result_drained,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: factor logic from `.Context.started()` for send-side
|
||||||
|
# validate raising!
|
||||||
|
def validate_payload_msg(
|
||||||
|
msg: Started|Yield|Return,
|
||||||
|
) -> MsgTypeError|None:
|
||||||
|
...
|
||||||
|
|
Loading…
Reference in New Issue