From a354732a9e1fdd9686376ed9a0ae59f95c691965 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 May 2024 08:50:16 -0400 Subject: [PATCH] Allow `Stop` passthrough from `PldRx.recv_msg_w_pld()` Since we need to allow it (at the least) inside `drain_until_final_msg()` for handling stream-phase termination races where we don't want to have to handle a raised error from something like `Context.result()`. Expose the passthrough option via a `passthrough_non_pld_msgs: bool` kwarg. Add comprehensive comment to `current_pldrx()`. --- tractor/msg/_ops.py | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 1ba623d..3b0b833 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -355,6 +355,9 @@ class PldRx(Struct): ipc: Context|MsgStream, expect_msg: MsgType, + # NOTE: generally speaking only for handling `Stop`-msgs that + # arrive during a call to `drain_to_final_msg()` above! + passthrough_non_pld_msgs: bool = True, **kwargs, ) -> tuple[MsgType, PayloadT]: @@ -365,6 +368,11 @@ class PldRx(Struct): ''' msg: MsgType = await ipc._rx_chan.receive() + if passthrough_non_pld_msgs: + match msg: + case Stop(): + return msg, None + # TODO: is there some way we can inject the decoded # payload into an existing output buffer for the original # msg instance? @@ -389,15 +397,30 @@ _ctxvar_PldRx: ContextVar[PldRx] = ContextVar( def current_pldrx() -> PldRx: ''' - Return the current `trio.Task.context`'s msg-payload - receiver, the post IPC but pre-app code `MsgType.pld` - filter. + Return the current `trio.Task.context`'s msg-payload-receiver. + + A payload receiver is the IPC-msg processing sub-sys which + filters inter-actor-task communicated payload data, i.e. the + `PayloadMsg.pld: PayloadT` field value, AFTER it's container + shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered + up from `tractor`'s transport layer but BEFORE the data is + yielded to application code, normally via an IPC primitive API + like, for ex., `pld_data: PayloadT = MsgStream.receive()`. Modification of the current payload spec via `limit_plds()` - allows an application to contextually filter typed IPC msg - content delivered via wire transport. + allows a `tractor` application to contextually filter IPC + payload content with a type specification as supported by + the interchange backend. + + - for `msgspec` see . + + NOTE that the `PldRx` itself is a per-`Context` global sub-system + that normally does not change other then the applied pld-spec + for the current `trio.Task`. ''' + # ctx: context = current_ipc_ctx() + # return ctx._pld_rx return _ctxvar_PldRx.get()