Commit Graph

19 Commits (e94020133cc6f2e2e9404c2a897f9ccfe0e1c860)

Author SHA1 Message Date
Tyler Goodlet 8fa23fcccd Raise RTE from `limit_plds()` on no `curr_ctx`
Since it should only be used from within a `Portal.open_context()`
scope, make sure the caller knows that!

Also don't hide the frame in tb if the immediate function errors..
2025-03-10 12:18:26 -04:00
Tyler Goodlet 61f6690261 Finally get type-extended `msgspec` fields workinn
By using our new `PldRx` design we can,
- pass through the pld-spec & a `dec_hook()` to our `MsgDec` which is
  used to configure the underlying `.dec: msgspec.msgpack.Decoder`
- pass through a `enc_hook()` to `mk_codec()` and use it to conf the
  equiv `MsgCodec.enc` such that sent msg-plds are converted prior
  to transport.

The trick ended up being just to always union the `mk_dec()`
extension-types spec with the normaly with the `msgspec.Raw` pld-spec
such that the `dec_hook()` is only invoked for payload types tagged
by the encoder/sender side B)

A variety of impl tweaks to make it all happen as well as various
cleanups in the `.msg._codec` mod include,

- `mk_dec()` no defaul `spec` arg, better doc string, accept the new
  `ext_types` arg, doing the union of that with `msgspec.Raw`.
- proto-ed a now unused `mk_boxed_ext_struct()` which will likely get
  removed since it ended up that our `PayloadMsg` structs already cover
  the ext-type-hook requirement that the decoder is passed
  a `.type=msgspec.Struct` of some sort in order for `.dec_hook` to be
  used.
- add a `unpack_spec_types()` util fn for getting the `set[Type]` from
  from a `Union[Type]` annotation instance.
- mk the default `mk_codec(pc_pld_spec = Raw,)` since the `PldRx` design
  was already passing/overriding it and it doesn't make much sense to
  use `Any` anymore for the same reason; it will cause various `Context`
  apis to now break.
  |_ also accept a `enc_hook()` and `ext_types` which are used to maybe
     config the `.msgpack.Encoder`
- generally tweak a bunch of comments-as-docs and todos namely the ones
  that are completed after the pld-rx design was implemented.

Also,
- mask the non-functioning `'defstruct'` approach `inside
  `.msg.types.mk_msg_spec()` to prep for its removal.

Adjust the test suite (rn called `test_caps_based_msging`),
- add a new suite `test_custom_extension_types` and move and
  use the `enc/dec_nsp()` hooks to the mod level for its use.
- prolly planning to drop the `test_limit_msgspec` suite since it's
  mostly replaced by the `test_pldrx_limiting` mod's version?
- originally was tweaking a bunch in `test_codec_hooks_mod` but likely
  it will get mostly rewritten to be simpler and simply verify that
  ext-typed fields can be used over IPC `Context`s between actors (as
  originally intended for this sub-suite).
2025-03-10 12:18:26 -04:00
Tyler Goodlet b45874a216 Show frames when decode is handed bad input 2025-03-10 12:17:53 -04:00
Tyler Goodlet 7e1ad860cd Fix `roundtripped` ref error in `validate_payload_msg()` 2025-03-10 12:17:53 -04:00
Tyler Goodlet 5c2e972315 Report any external-rent-task-canceller during msg-drain
As in whenever `Context.cancel()` is not (runtime internally) called
(i.e. `._cancel_called` is not set), we can attempt to detect the parent
`trio` nursery/cancel-scope that is the source. Emit the report with
a `.cancel()` level and attempt to repr in "sclang" form as well as
unhide the stack frame for debug/traceback-in.
2024-08-26 14:29:09 -04:00
Tyler Goodlet b56352b0e4 Quieter `Stop` handling on ctx result capture
In the `drain_to_final_msg()` impl, since a stream terminating
gracefully requires this msg, there's really no reason to `log.cancel()`
about it; go `.runtime()` level instead since we're trying de-noise
under "normal operation".

Also,
- passthrough `hide_tb` to taskc-handler's `ctx.maybe_raise()` call.
- raise `MessagingError` for the `MsgType` unmatched `case _:`.
- detail the doc string motivation a little more.
2024-07-03 22:42:32 -04:00
Tyler Goodlet 711f639fc5 Break `_mk_msg_type_err()` into recv/send side funcs
Name them `_mk_send_mte()`/`_mk_recv_mte()` and change the runtime to
call each appropriately depending on location/usage.

Also add some dynamic call-frame "unhide" blocks such that when we
expect raised MTE from the aboves calls but we get a different
unexpected error from the runtime, we ensure the call stack downward is
shown in tbs/pdb.
|_ ideally in the longer run we come up with a fancier dynamic sys for
   this, prolly something in `.devx._frame_stack`?
2024-06-17 13:12:16 -04:00
Tyler Goodlet 04bd111037 Proxy through `dec_hook` in `.limit_plds()` APIs 2024-06-17 09:23:31 -04:00
Tyler Goodlet 13ea500a44 Rename `PldRx.dec_msg()` -> `.decode_pld()`
Keep the old alias, but i think it's better form to use longer names for
internal public APIs and this name better reflects the functionality:
decoding and returning a `PayloadMsg.pld` field.
2024-05-30 16:09:59 -04:00
Tyler Goodlet 28af4749cc Don't need to pack an `Error` with send-side MTEs 2024-05-30 09:05:23 -04:00
Tyler Goodlet a1b124b62b Raise send-side MTEs inline in `PldRx.dec_msg()`
So when `is_started_send_side is True` we raise the newly created
`MsgTypeError` (MTE) directly instead of doing all the `Error`-msg pack
and unpack to raise stuff via `_raise_from_unexpected_msg()` since the
raise should happen send side anyway and so doesn't emulate any remote
fault like in a bad `Return` or `Started` without send-side pld-spec
validation.

Oh, and proxy-through the `hide_tb: bool` input from `.drain_to_final_msg()`
to `.recv_msg_w_pld()`.
2024-05-28 16:02:51 -04:00
Tyler Goodlet 6c2efc96dc Factor `.started()` validation into `.msg._ops`
Filling out the helper `validate_payload_msg()` staged in a prior commit
and adjusting all imports to match.

Also add a `raise_mte: bool` flag for potential usage where the caller
wants to handle the MTE instance themselves.
2024-05-28 11:08:27 -04:00
Tyler Goodlet 42ba855d1b More correct/explicit `.started()` send-side validation
In the sense that we handle it as a special case that exposed
through to `RxPld.dec_msg()` with a new `is_started_send_side: bool`.

(Non-ideal) `Context.started()` impl deats:
- only do send-side pld-spec validation when a new `validate_pld_spec`
  is set (by default it's not).
- call `self.pld_rx.dec_msg(is_started_send_side=True)` to validate the
  payload field from the just codec-ed `Started` msg's `msg_bytes` by
  passing the `roundtripped` msg (with it's `.pld: Raw`) directly.
- add a `hide_tb: bool` param and proxy it to the `.dec_msg()` call.

(Non-ideal) `PldRx.dec_msg()` impl deats:
- for now we're packing the MTE inside an `Error` via a manual call to
  `pack_error()` and then setting that as the `msg` passed to
  `_raise_from_unexpected_msg()` (though really we should just raise
  inline?).
- manually set the `MsgTypeError._ipc_msg` to the above..

Other,
- more comprehensive `Context` type doc string.
- various `hide_tb: bool` kwarg additions through `._ops.PldRx` meths.
- proto a `.msg._ops.validate_payload_msg()` helper planned to get the
  logic from this version of `.started()`'s send-side validation so as
  to be useful more generally elsewhere.. (like for raising back
  `Return` values on the child side?).

Warning: this commit may have been made out of order from required
changes to `._exceptions` which will come in a follow up!
2024-05-27 14:59:40 -04:00
Tyler Goodlet 262a0e36c6 Allocate a `PldRx` per `Context`, new pld-spec API
Since the state mgmt becomes quite messy with multiple sub-tasks inside
an IPC ctx, AND bc generally speaking the payload-type-spec should map
1-to-1 with the `Context`, it doesn't make a lot of sense to be using
`ContextVar`s to modify the `Context.pld_rx: PldRx` instance.

Instead, always allocate a full instance inside `mk_context()` with the
default `.pld_rx: PldRx` set to use the `msg._ops._def_any_pldec: MsgDec`

In support, simplify the `.msg._ops` impl and APIs:
- drop `_ctxvar_PldRx`, `_def_pld_rx` and `current_pldrx()`.
- rename `PldRx._pldec` -> `._pld_dec`.
- rename the unused `PldRx.apply_to_ipc()` -> `.wraps_ipc()`.
- add a required `PldRx._ctx: Context` attr since it is needed
  internally in some meths and each pld-rx now maps to a specific ctx.
- modify all recv methods to accept a `ipc: Context|MsgStream` (instead
  of a `ctx` arg) since both have a ref to the same `._rx_chan` and there
  are only a couple spots (in `.dec_msg()`) where we need the `ctx`
  explicitly (which can now be easily accessed via a new `MsgStream.ctx`
  property, see below).
- always show the `.dec_msg()` frame in tbs if there's a reference error
  when calling `_raise_from_unexpected_msg()` in the fallthrough case.
- implement `limit_plds()` as light wrapper around getting the
  `current_ipc_ctx()` and mutating its `MsgDec` via
  `Context.pld_rx.limit_plds()`.
- add a `maybe_limit_plds()` which just provides an `@acm` equivalent of
  `limit_plds()` handy for composing in a `async with ():` style block
  (avoiding additional indent levels in the body of async funcs).

Obvi extend the `Context` and `MsgStream` interfaces as needed
to match the above:
- add a `Context.pld_rx` pub prop.
- new private refs to `Context._started_msg: Started` and
  a `._started_pld` (mostly for internal debugging / testing / logging)
  and set inside `.open_context()` immediately after the syncing phase.
- a `Context.has_outcome() -> bool:` predicate which can be used to more
  easily determine if the ctx errored or has a final result.
- pub props for `MsgStream.ctx: Context` and `.chan: Channel` providing
  full `ipc`-arg compat with the `PldRx` method signatures.
2024-05-20 15:46:28 -04:00
Tyler Goodlet a354732a9e Allow `Stop` passthrough from `PldRx.recv_msg_w_pld()`
Since we need to allow it (at the least) inside
`drain_until_final_msg()` for handling stream-phase termination races
where we don't want to have to handle a raised error from something like
`Context.result()`. Expose the passthrough option via
a `passthrough_non_pld_msgs: bool` kwarg.

Add comprehensive comment to `current_pldrx()`.
2024-05-08 08:50:16 -04:00
Tyler Goodlet b278164f83 Mk `drain_to_final_msg()` never raise from `Error`
Since we usually want them raised from some (internal) call to
`Context.maybe_raise()` and NOT directly from the drainage call, make it
possible via a new `raise_error: bool` to both `PldRx.recv_msg_w_pld()`
and `.dec_msg()`.

In support,
- rename `return_msg` -> `result_msg` since we expect to return
  `Error`s.
- do a `result_msg` assign and `break` in the `case Error()`.
- add `**dec_msg_kwargs` passthrough for other `.dec_msg()` calling
  methods.

Other,
- drop/aggregate todo-notes around the main loop's
  `ctx._pld_rx.recv_msg_w_pld()` call.
- add (configurable) frame hiding to most payload receive meths.
2024-05-06 13:43:51 -04:00
Tyler Goodlet 30c5896d26 Fix attr name error, use public `MsgDec.dec` 2024-04-30 12:55:46 -04:00
Tyler Goodlet a3429268ea First draft payload-spec limit API
Add new task-scope oriented `PldRx.pld_spec` management API similar to
`.msg._codec.limit_msg_spec()`, but obvi built to process and filter
`MsgType.pld` values.

New API related changes include:
- new per-task singleton getter `msg._ops.current_pldrx()` which
  delivers the current (global) payload receiver via a new
  `_ctxvar_PldRx: ContextVar` configured with a default
  `_def_any_pldec: MsgDec[Any]` decoder.
- a `PldRx.limit_plds()` which sets the decoder (`.type` underneath)
  for the specific payload rx instance.
- `.msg._ops.limit_plds()` which obtains the current task-scoped `PldRx`
  and applies the pld spec via a new `PldRx.limit_plds()`.
- rename `PldRx._msgdec` -> `._pldec`.
- add `.pld_dec` as pub attr for -^

Unrelated adjustments:
- use `.msg.pretty_struct.pformat()` where handy.
- always pass `expect_msg: MsgType`.
- add a `case Stop()` to `PldRx.dec_msg()` which will `log.warning()`
  when a stop is received by no stream was open on this receiving side
  since we rarely want that to raise since it's prolly just a runtime
  race or mistake in user code.

Other:
2024-04-26 15:29:50 -04:00
Tyler Goodlet 5eb9144921 First draft "payload receiver in a new `.msg._ops`
As per much tinkering, re-designs and preceding rubber-ducking via many
"commit msg novelas", **finally** this adds the (hopefully) final
missing layer for typed msg safety: `tractor.msg._ops.PldRx`

(or `PayloadReceiver`? haven't decided how verbose to go..)

Design justification summary:
      ------ - ------
- need a way to be as-close-as-possible to the `tractor`-application
  such that when `MsgType.pld: PayloadT` validation takes place, it is
  straightforward and obvious how user code can decide to handle any
  resulting `MsgTypeError`.
- there should be a common and optional-yet-modular way to modify
  **how** data delivered via IPC (possibly embedded as user defined,
  type-constrained `.pld: msgspec.Struct`s) can be handled and processed
  during fault conditions and/or IPC "msg attacks".
- support for nested type constraints within a `MsgType.pld` field
  should be simple to define, implement and understand at runtime.
- a layer between the app-level IPC primitive APIs
  (`Context`/`MsgStream`) and application-task code (consumer code of
  those APIs) should be easily customized and prove-to-be-as-such
  through demonstrably rigorous internal (sub-sys) use!
  -> eg. via seemless runtime RPC eps support like `Actor.cancel()`
  -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt
    dialog prot, via a dead simple payload-as-ctl-msg-spec.

There are some fairly detailed doc strings included so I won't duplicate
that content, the majority of the work here is actually somewhat of
a factoring of many similar blocks that are doing more or less the same
`msg = await Context._rx_chan.receive()` with boilerplate for
`Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new
`PldRx` basically provides a shim layer for this common "receive msg,
decode its payload, yield it up to the consuming app task" by pairing
the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API
internals to use **one** API instead of re-implementing the same pattern
all over the place XD

`PldRx` breakdown
 ------ - ------
- for now only expects a `._msgdec: MsgDec` which allows for
  override-able `MsgType.pld` validation and most obviously used in
  the impl of `.dec_msg()`, the decode message method.
- provides multiple mem-chan receive options including:
 |_ `.recv_pld()` which does the e2e operation of receiving a payload
    item.
 |_ a sync `.recv_pld_nowait()` version.
 |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the
    shuttling `MsgType` as well as it's `.pld` body for use cases where
    info on both is important (eg. draining a `MsgStream`).

Dirty internal changeover/implementation deatz:
             ------ - ------
- obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield
  logic:
 - `MsgStream.receive[_nowait]()` delegating instead to the equivalent
   `PldRx.recv_pld[_nowait]()`.
 - add `Context._pld_rx: PldRx`, created and passed in by
   `mk_context()`; use it for the `.started()` -> `first: Started`
   retrieval inside `open_context_from_portal()`.
 - all the relevant `Portal` invocation methods: `.result()`,
   `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()`
   and `.Portal_return_once()` outright Bo
- rename `Context.ctx._recv_chan` -> `._rx_chan`.
- add detailed `Context._scope` info for logging whether or not it's
  cancelled inside `_maybe_cancel_and_set_remote_error()`.
- move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()`
  since it's really not necessarily ctx specific per say, and it does
  kinda fit with "msg operations" more abstractly ;)
2024-04-24 00:59:41 -04:00