Allow `Stop` passthrough from `PldRx.recv_msg_w_pld()`

Since we need to allow it (at the least) inside
`drain_until_final_msg()` for handling stream-phase termination races
where we don't want to have to handle a raised error from something like
`Context.result()`. Expose the passthrough option via
a `passthrough_non_pld_msgs: bool` kwarg.

Add comprehensive comment to `current_pldrx()`.
runtime_to_msgspec
Tyler Goodlet 2024-05-08 08:50:16 -04:00
parent fbc21a1dec
commit a354732a9e
1 changed files with 28 additions and 5 deletions

View File

@ -355,6 +355,9 @@ class PldRx(Struct):
ipc: Context|MsgStream,
expect_msg: MsgType,
# NOTE: generally speaking only for handling `Stop`-msgs that
# arrive during a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
**kwargs,
) -> tuple[MsgType, PayloadT]:
@ -365,6 +368,11 @@ class PldRx(Struct):
'''
msg: MsgType = await ipc._rx_chan.receive()
if passthrough_non_pld_msgs:
match msg:
case Stop():
return msg, None
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
@ -389,15 +397,30 @@ _ctxvar_PldRx: ContextVar[PldRx] = ContextVar(
def current_pldrx() -> PldRx:
'''
Return the current `trio.Task.context`'s msg-payload
receiver, the post IPC but pre-app code `MsgType.pld`
filter.
Return the current `trio.Task.context`'s msg-payload-receiver.
A payload receiver is the IPC-msg processing sub-sys which
filters inter-actor-task communicated payload data, i.e. the
`PayloadMsg.pld: PayloadT` field value, AFTER it's container
shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered
up from `tractor`'s transport layer but BEFORE the data is
yielded to application code, normally via an IPC primitive API
like, for ex., `pld_data: PayloadT = MsgStream.receive()`.
Modification of the current payload spec via `limit_plds()`
allows an application to contextually filter typed IPC msg
content delivered via wire transport.
allows a `tractor` application to contextually filter IPC
payload content with a type specification as supported by
the interchange backend.
- for `msgspec` see <PUTLINKHERE>.
NOTE that the `PldRx` itself is a per-`Context` global sub-system
that normally does not change other then the applied pld-spec
for the current `trio.Task`.
'''
# ctx: context = current_ipc_ctx()
# return ctx._pld_rx
return _ctxvar_PldRx.get()