Compare commits

...

6 Commits

Author SHA1 Message Date
Tyler Goodlet 5497401920 Add `MsgStream._stop_msg` use new `PldRx` API
In particular ensuring we use `ctx._pld_rx.recv_msg_nowait()` from
`.receive_nowait()` (which is called from `.aclose()`) such that we
ALWAYS (can) set the surrounding `Context._result/._outcome_msg` attrs
on reception of a final `Return`!!

This fixes a final stream-teardown-race-condition-bug where prior we
normally didn't set the `Context._result/._outcome_msg` in such cases.
This is **precisely because**  `.receive_nowait()` only returns the
`pld` and when called from `.aclose()` this value is discarded, meaning
so is its boxing `Return` despite consuming it from the underlying
`._rx_chan`..

Longer term this should be solved differently by ensuring such races
cases are handled at a higher scope like inside `Context._deliver_msg()`
or the `Portal.open_context()` enter/exit blocks? Add a detailed warning
note and todos for all this around the special case block!
2025-03-12 16:39:52 -04:00
Tyler Goodlet 03c447df0d Add `Context._outcome_msg` use new `PldRx` API
Such that any `Return` is always capture for each ctx instance and set
in `._deliver_msg()` normally; ensures we can at least introspect for it
when missing (like in a recently discovered stream teardown race bug).
Yes this augments the already existing `._result` which is dedicated for
the `._outcome_msg.pld` in the non-error case; we might want to see if
there's a nicer way to directly proxy ref to that without getting the
pre-pld-decoded `Raw` form with `msgspec`?

Also use the new `ctx._pld_rx.recv_msg()` and drop assigning
`pld_rx._ctx`.
2025-03-12 15:15:30 -04:00
Tyler Goodlet 1ce99ae742 Slight `PldRx` rework to simplify
Namely renaming and tweaking the `MsgType` receiving methods,
- `.recv_msg()` from what was `.recv_msg_w_pld()` which both receives
  the IPC msg from the underlying `._rx_chan` and then decodes its
  payload with `.decode_pld()`; it now also log reports on the different
  "stage of SC dialog protocol" msg types via a `match/case`.
- a new `.recv_msg_nowait()` sync equivalent of ^ (*was*
  `.recv_pld_nowait()`) who's use was the source of a recently
  discovered bug where any final `Return.pld` is being
  consumed-n-discarded by by `MsgStream.aclose()` depending on
  ctx/stream teardown race conditions..

Also,
- remove all the "instance persistent" ipc-ctx attrs, specifically the
  optional `_ipc`, `_ctx` and the `.wraps_ipc()` cm, since none of them
  were ever really needed/used; all methods which require
  a `Context/MsgStream` are explicitly always passed.
- update a buncha typing namely to use the more generic-styled
  `PayloadT` over `Any` and obviously `MsgType[PayloadT]`.
2025-03-12 13:49:58 -04:00
Tyler Goodlet 96826854b7 Rename ext-types with `msgspec` suite module 2025-03-12 13:47:53 -04:00
Tyler Goodlet 434577953a Complete rename to parent->child IPC ctx peers
Now changed in all comments docs **and** test-code content such that we
aren't using the "caller"->"callee" semantics anymore.
2025-03-12 13:15:48 -04:00
Tyler Goodlet b6b001faad Mk `tests/__init__.py`, not sure where it went?
I must have had a local touched file but never committed or something?
Seems that new `pytest` requires a top level `tests` pkg in order for
relative `.conftest` imports to work.
2025-03-12 13:13:20 -04:00
10 changed files with 255 additions and 175 deletions

View File

View File

@ -22,7 +22,7 @@ from tractor.devx._debug import (
_repl_fail_msg as _repl_fail_msg,
_ctlc_ignore_header as _ctlc_ignore_header,
)
from conftest import (
from ..conftest import (
_ci_env,
)

View File

@ -14,7 +14,7 @@ import tractor
from tractor._testing import (
tractor_test,
)
from conftest import no_windows
from .conftest import no_windows
def is_win():

View File

@ -38,9 +38,9 @@ from tractor._testing import (
# - standard setup/teardown:
# ``Portal.open_context()`` starts a new
# remote task context in another actor. The target actor's task must
# call ``Context.started()`` to unblock this entry on the caller side.
# the callee task executes until complete and returns a final value
# which is delivered to the caller side and retreived via
# call ``Context.started()`` to unblock this entry on the parent side.
# the child task executes until complete and returns a final value
# which is delivered to the parent side and retreived via
# ``Context.result()``.
# - cancel termination:
@ -170,9 +170,9 @@ async def assert_state(value: bool):
[False, ValueError, KeyboardInterrupt],
)
@pytest.mark.parametrize(
'callee_blocks_forever',
'child_blocks_forever',
[False, True],
ids=lambda item: f'callee_blocks_forever={item}'
ids=lambda item: f'child_blocks_forever={item}'
)
@pytest.mark.parametrize(
'pointlessly_open_stream',
@ -181,7 +181,7 @@ async def assert_state(value: bool):
)
def test_simple_context(
error_parent,
callee_blocks_forever,
child_blocks_forever,
pointlessly_open_stream,
debug_mode: bool,
):
@ -204,13 +204,13 @@ def test_simple_context(
portal.open_context(
simple_setup_teardown,
data=10,
block_forever=callee_blocks_forever,
block_forever=child_blocks_forever,
) as (ctx, sent),
):
assert current_ipc_ctx() is ctx
assert sent == 11
if callee_blocks_forever:
if child_blocks_forever:
await portal.run(assert_state, value=True)
else:
assert await ctx.result() == 'yo'
@ -220,7 +220,7 @@ def test_simple_context(
if error_parent:
raise error_parent
if callee_blocks_forever:
if child_blocks_forever:
await ctx.cancel()
else:
# in this case the stream will send a
@ -259,9 +259,9 @@ def test_simple_context(
@pytest.mark.parametrize(
'callee_returns_early',
'child_returns_early',
[True, False],
ids=lambda item: f'callee_returns_early={item}'
ids=lambda item: f'child_returns_early={item}'
)
@pytest.mark.parametrize(
'cancel_method',
@ -273,14 +273,14 @@ def test_simple_context(
[True, False],
ids=lambda item: f'chk_ctx_result_before_exit={item}'
)
def test_caller_cancels(
def test_parent_cancels(
cancel_method: str,
chk_ctx_result_before_exit: bool,
callee_returns_early: bool,
child_returns_early: bool,
debug_mode: bool,
):
'''
Verify that when the opening side of a context (aka the caller)
Verify that when the opening side of a context (aka the parent)
cancels that context, the ctx does not raise a cancelled when
either calling `.result()` or on context exit.
@ -294,7 +294,7 @@ def test_caller_cancels(
if (
cancel_method == 'portal'
and not callee_returns_early
and not child_returns_early
):
try:
res = await ctx.result()
@ -318,7 +318,7 @@ def test_caller_cancels(
pytest.fail(f'should not have raised ctxc\n{ctxc}')
# we actually get a result
if callee_returns_early:
if child_returns_early:
assert res == 'yo'
assert ctx.outcome is res
assert ctx.maybe_error is None
@ -362,14 +362,14 @@ def test_caller_cancels(
)
timeout: float = (
0.5
if not callee_returns_early
if not child_returns_early
else 2
)
with trio.fail_after(timeout):
async with (
expect_ctxc(
yay=(
not callee_returns_early
not child_returns_early
and cancel_method == 'portal'
)
),
@ -377,13 +377,13 @@ def test_caller_cancels(
portal.open_context(
simple_setup_teardown,
data=10,
block_forever=not callee_returns_early,
block_forever=not child_returns_early,
) as (ctx, sent),
):
if callee_returns_early:
if child_returns_early:
# ensure we block long enough before sending
# a cancel such that the callee has already
# a cancel such that the child has already
# returned it's result.
await trio.sleep(0.5)
@ -421,7 +421,7 @@ def test_caller_cancels(
# which should in turn cause `ctx._scope` to
# catch any cancellation?
if (
not callee_returns_early
not child_returns_early
and cancel_method != 'portal'
):
assert not ctx._scope.cancelled_caught
@ -430,11 +430,11 @@ def test_caller_cancels(
# basic stream terminations:
# - callee context closes without using stream
# - caller context closes without using stream
# - caller context calls `Context.cancel()` while streaming
# is ongoing resulting in callee being cancelled
# - callee calls `Context.cancel()` while streaming and caller
# - child context closes without using stream
# - parent context closes without using stream
# - parent context calls `Context.cancel()` while streaming
# is ongoing resulting in child being cancelled
# - child calls `Context.cancel()` while streaming and parent
# sees stream terminated in `RemoteActorError`
# TODO: future possible features
@ -470,7 +470,7 @@ async def test_child_exits_ctx_after_stream_open(
parent_send_before_receive: bool,
):
'''
callee context closes without using stream.
child context closes without using stream.
This should result in a msg sequence
|_<root>_
@ -485,13 +485,7 @@ async def test_child_exits_ctx_after_stream_open(
'''
timeout: float = (
0.5 if (
not debug_mode
# NOTE, for debugging final
# Return-consumed-n-discarded-ishue!
# and
# not parent_send_before_receive
) else 999
0.5 if not debug_mode else 999
)
async with tractor.open_nursery(
debug_mode=debug_mode,
@ -602,7 +596,7 @@ async def expect_cancelled(
raise
else:
assert 0, "callee wasn't cancelled !?"
assert 0, "child wasn't cancelled !?"
@pytest.mark.parametrize(
@ -857,7 +851,7 @@ async def test_child_cancels_before_started(
debug_mode: bool,
):
'''
Callee calls `Context.cancel()` while streaming and caller
Callee calls `Context.cancel()` while streaming and parent
sees stream terminated in `ContextCancelled`.
'''
@ -910,7 +904,7 @@ async def keep_sending_from_child(
) -> None:
'''
Send endlessly on the calleee stream.
Send endlessly on the child stream.
'''
await ctx.started()
@ -918,7 +912,7 @@ async def keep_sending_from_child(
msg_buffer_size=msg_buffer_size,
) as stream:
for msg in count():
print(f'callee sending {msg}')
print(f'child sending {msg}')
await stream.send(msg)
await trio.sleep(0.01)
@ -926,12 +920,12 @@ async def keep_sending_from_child(
@pytest.mark.parametrize(
'overrun_by',
[
('caller', 1, never_open_stream),
('callee', 0, keep_sending_from_child),
('parent', 1, never_open_stream),
('child', 0, keep_sending_from_child),
],
ids=[
('caller_1buf_never_open_stream'),
('callee_0buf_keep_sending_from_callee'),
('parent_1buf_never_open_stream'),
('child_0buf_keep_sending_from_child'),
]
)
def test_one_end_stream_not_opened(
@ -962,8 +956,7 @@ def test_one_end_stream_not_opened(
) as (ctx, sent):
assert sent is None
if 'caller' in overrunner:
if 'parent' in overrunner:
async with ctx.open_stream() as stream:
# itersend +1 msg more then the buffer size
@ -978,7 +971,7 @@ def test_one_end_stream_not_opened(
await trio.sleep_forever()
else:
# callee overruns caller case so we do nothing here
# child overruns parent case so we do nothing here
await trio.sleep_forever()
await portal.cancel_actor()
@ -986,19 +979,19 @@ def test_one_end_stream_not_opened(
# 2 overrun cases and the no overrun case (which pushes right up to
# the msg limit)
if (
overrunner == 'caller'
overrunner == 'parent'
):
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
assert excinfo.value.boxed_type == StreamOverrun
elif overrunner == 'callee':
elif overrunner == 'child':
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
# TODO: embedded remote errors so that we can verify the source
# error? the callee delivers an error which is an overrun
# error? the child delivers an error which is an overrun
# wrapped in a remote actor error.
assert excinfo.value.boxed_type == tractor.RemoteActorError
@ -1017,12 +1010,12 @@ async def echo_back_sequence(
) -> None:
'''
Send endlessly on the calleee stream using a small buffer size
Send endlessly on the child stream using a small buffer size
setting on the contex to simulate backlogging that would normally
cause overruns.
'''
# NOTE: ensure that if the caller is expecting to cancel this task
# NOTE: ensure that if the parent is expecting to cancel this task
# that we stay echoing much longer then they are so we don't
# return early instead of receive the cancel msg.
total_batches: int = (
@ -1072,18 +1065,18 @@ async def echo_back_sequence(
if be_slow:
await trio.sleep(0.05)
print('callee waiting on next')
print('child waiting on next')
print(f'callee echoing back latest batch\n{batch}')
print(f'child echoing back latest batch\n{batch}')
for msg in batch:
print(f'callee sending msg\n{msg}')
print(f'child sending msg\n{msg}')
await stream.send(msg)
try:
return 'yo'
finally:
print(
'exiting callee with context:\n'
'exiting child with context:\n'
f'{pformat(ctx)}\n'
)
@ -1137,7 +1130,7 @@ def test_maybe_allow_overruns_stream(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'callee_sends_forever',
'child_sends_forever',
enable_modules=[__name__],
loglevel=loglevel,
debug_mode=debug_mode,

View File

@ -10,7 +10,7 @@ import tractor
from tractor._testing import (
tractor_test,
)
from conftest import (
from .conftest import (
sig_prog,
_INT_SIGNAL,
_INT_RETURN_CODE,

View File

@ -82,6 +82,7 @@ from .msg import (
MsgType,
NamespacePath,
PayloadT,
Return,
Started,
Stop,
Yield,
@ -245,11 +246,13 @@ class Context:
# a drain loop?
# _res_scope: trio.CancelScope|None = None
_outcome_msg: Return|Error|ContextCancelled = Unresolved
# on a clean exit there should be a final value
# delivered from the far end "callee" task, so
# this value is only set on one side.
# _result: Any | int = None
_result: Any|Unresolved = Unresolved
_result: PayloadT|Unresolved = Unresolved
# if the local "caller" task errors this value is always set
# to the error that was captured in the
@ -1199,9 +1202,11 @@ class Context:
'''
__tracebackhide__: bool = hide_tb
assert self._portal, (
'`Context.wait_for_result()` can not be called from callee side!'
)
if not self._portal:
raise RuntimeError(
'Invalid usage of `Context.wait_for_result()`!\n'
'Not valid on child-side IPC ctx!\n'
)
if self._final_result_is_set():
return self._result
@ -1222,6 +1227,8 @@ class Context:
# since every message should be delivered via the normal
# `._deliver_msg()` route which will appropriately set
# any `.maybe_error`.
outcome_msg: Return|Error|ContextCancelled
drained_msgs: list[MsgType]
(
outcome_msg,
drained_msgs,
@ -1229,11 +1236,19 @@ class Context:
ctx=self,
hide_tb=hide_tb,
)
drained_status: str = (
'Ctx drained to final outcome msg\n\n'
f'{outcome_msg}\n'
)
# ?XXX, should already be set in `._deliver_msg()` right?
if self._outcome_msg is not Unresolved:
# from .devx import _debug
# await _debug.pause()
assert self._outcome_msg is outcome_msg
else:
self._outcome_msg = outcome_msg
if drained_msgs:
drained_status += (
'\n'
@ -1741,7 +1756,6 @@ class Context:
f'{structfmt(msg)}\n'
)
# NOTE: if an error is deteced we should always still
# send it through the feeder-mem-chan and expect
# it to be raised by any context (stream) consumer
@ -1753,6 +1767,21 @@ class Context:
# normally the task that should get cancelled/error
# from some remote fault!
send_chan.send_nowait(msg)
match msg:
case Stop():
if (stream := self._stream):
stream._stop_msg = msg
case Return():
if not self._outcome_msg:
log.warning(
f'Setting final outcome msg AFTER '
f'`._rx_chan.send()`??\n'
f'\n'
f'{msg}'
)
self._outcome_msg = msg
return True
except trio.BrokenResourceError:
@ -2009,7 +2038,7 @@ async def open_context_from_portal(
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
try:
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
started_msg, first = await ctx._pld_rx.recv_msg(
ipc=ctx,
expect_msg=Started,
passthrough_non_pld_msgs=False,
@ -2374,7 +2403,8 @@ async def open_context_from_portal(
# displaying `ContextCancelled` traces where the
# cause of crash/exit IS due to something in
# user/app code on either end of the context.
and not rxchan._closed
and
not rxchan._closed
):
# XXX NOTE XXX: and again as per above, we mask any
# `trio.Cancelled` raised here so as to NOT mask
@ -2433,6 +2463,7 @@ async def open_context_from_portal(
# FINALLY, remove the context from runtime tracking and
# exit!
log.runtime(
# log.cancel(
f'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
f'uid: {uid}\n'
f'cid: {ctx.cid}\n'
@ -2488,7 +2519,6 @@ def mk_context(
_caller_info=caller_info,
**kwargs,
)
pld_rx._ctx = ctx
ctx._result = Unresolved
return ctx

View File

@ -184,7 +184,7 @@ class Portal:
(
self._final_result_msg,
self._final_result_pld,
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld(
) = await self._expect_result_ctx._pld_rx.recv_msg(
ipc=self._expect_result_ctx,
expect_msg=Return,
)

View File

@ -45,9 +45,11 @@ from .trionics import (
BroadcastReceiver,
)
from tractor.msg import (
# Return,
# Stop,
Error,
Return,
Stop,
MsgType,
PayloadT,
Yield,
)
@ -70,8 +72,7 @@ class MsgStream(trio.abc.Channel):
A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC `Channel`.
This is the type returned to a local task which entered either
`Portal.open_stream_from()` or `Context.open_stream()`.
Termination rules:
@ -94,6 +95,9 @@ class MsgStream(trio.abc.Channel):
self._rx_chan = rx_chan
self._broadcaster = _broadcaster
# any actual IPC msg which is effectively an `EndOfStream`
self._stop_msg: bool|Stop = False
# flag to denote end of stream
self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False
@ -125,16 +129,67 @@ class MsgStream(trio.abc.Channel):
def receive_nowait(
self,
expect_msg: MsgType = Yield,
):
) -> PayloadT:
ctx: Context = self._ctx
return ctx._pld_rx.recv_pld_nowait(
(
msg,
pld,
) = ctx._pld_rx.recv_msg_nowait(
ipc=self,
expect_msg=expect_msg,
)
# ?TODO, maybe factor this into a hyper-common `unwrap_pld()`
#
match msg:
# XXX, these never seems to ever hit? cool?
case Stop():
log.cancel(
f'Msg-stream was ended via stop msg\n'
f'{msg}'
)
case Error():
log.error(
f'Msg-stream was ended via error msg\n'
f'{msg}'
)
# XXX NOTE, always set any final result on the ctx to
# avoid teardown race conditions where previously this msg
# would be consumed silently (by `.aclose()` doing its
# own "msg drain loop" but WITHOUT those `drained: lists[MsgType]`
# being post-close-processed!
#
# !!TODO, see the equiv todo-comment in `.receive()`
# around the `if drained:` where we should prolly
# ACTUALLY be doing this post-close processing??
#
case Return(pld=pld):
log.warning(
f'Msg-stream final result msg for IPC ctx?\n'
f'{msg}'
)
# XXX TODO, this **should be covered** by higher
# scoped runtime-side method calls such as
# `Context._deliver_msg()`, so you should never
# really see the warning above or else something
# racy/out-of-order is likely going on between
# actor-runtime-side push tasks and the user-app-side
# consume tasks!
# -[ ] figure out that set of race cases and fix!
# -[ ] possibly return the `msg` given an input
# arg-flag is set so we can process the `Return`
# from the `.aclose()` caller?
#
# breakpoint() # to debug this RACE CASE!
ctx._result = pld
ctx._outcome_msg = msg
return pld
async def receive(
self,
hide_tb: bool = False,
):
'''
@ -154,7 +209,7 @@ class MsgStream(trio.abc.Channel):
# except trio.EndOfChannel:
# raise StopAsyncIteration
#
# see ``.aclose()`` for notes on the old behaviour prior to
# see `.aclose()` for notes on the old behaviour prior to
# introducing this
if self._eoc:
raise self._eoc
@ -165,7 +220,11 @@ class MsgStream(trio.abc.Channel):
src_err: Exception|None = None # orig tb
try:
ctx: Context = self._ctx
return await ctx._pld_rx.recv_pld(ipc=self)
pld = await ctx._pld_rx.recv_pld(
ipc=self,
expect_msg=Yield,
)
return pld
# XXX: the stream terminates on either of:
# - `self._rx_chan.receive()` raising after manual closure
@ -174,7 +233,7 @@ class MsgStream(trio.abc.Channel):
# - via a `Stop`-msg received from remote peer task.
# NOTE
# |_ previously this was triggered by calling
# ``._rx_chan.aclose()`` on the send side of the channel
# `._rx_chan.aclose()` on the send side of the channel
# inside `Actor._deliver_ctx_payload()`, but now the 'stop'
# message handling gets delegated to `PldRFx.recv_pld()`
# internals.
@ -198,11 +257,14 @@ class MsgStream(trio.abc.Channel):
# terminated and signal this local iterator to stop
drained: list[Exception|dict] = await self.aclose()
if drained:
# ?TODO? pass these to the `._ctx._drained_msgs: deque`
# and then iterate them as part of any `.wait_for_result()` call?
#
# from .devx import pause
# await pause()
# ^^^^^^^^TODO? pass these to the `._ctx._drained_msgs:
# deque` and then iterate them as part of any
# `.wait_for_result()` call?
#
# -[ ] move the match-case processing from
# `.receive_nowait()` instead to right here, use it from
# a for msg in drained:` post-proc loop?
#
log.warning(
'Drained context msgs during closure\n\n'
f'{drained}'
@ -265,9 +327,6 @@ class MsgStream(trio.abc.Channel):
- more or less we try to maintain adherance to trio's `.aclose()` semantics:
https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
'''
# rx_chan = self._rx_chan
# XXX NOTE XXX
# it's SUPER IMPORTANT that we ensure we don't DOUBLE
# DRAIN msgs on closure so avoid getting stuck handing on
@ -279,15 +338,16 @@ class MsgStream(trio.abc.Channel):
# this stream has already been closed so silently succeed as
# per ``trio.AsyncResource`` semantics.
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
# import tractor
# await tractor.pause()
return []
ctx: Context = self._ctx
drained: list[Exception|dict] = []
while not drained:
try:
maybe_final_msg = self.receive_nowait(
# allow_msgs=[Yield, Return],
expect_msg=Yield,
maybe_final_msg: Yield|Return = self.receive_nowait(
expect_msg=Yield|Return,
)
if maybe_final_msg:
log.debug(
@ -372,8 +432,10 @@ class MsgStream(trio.abc.Channel):
# await rx_chan.aclose()
if not self._eoc:
this_side: str = self._ctx.side
peer_side: str = self._ctx.peer_side
message: str = (
f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n'
# } bc a stream is a "scope"/msging-phase inside an IPC
f'x}}>\n'
f' |_{self}\n'
@ -381,9 +443,19 @@ class MsgStream(trio.abc.Channel):
log.cancel(message)
self._eoc = trio.EndOfChannel(message)
if (
(rx_chan := self._rx_chan)
and
(stats := rx_chan.statistics()).tasks_waiting_receive
):
log.cancel(
f'Msg-stream is closing but there is still reader tasks,\n'
f'{stats}\n'
)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <=
# if we're a bi-dir ``MsgStream`` BECAUSE this same
# if we're a bi-dir `MsgStream` BECAUSE this same
# core-msg-loop mem recv-chan is used to deliver the
# potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that

View File

@ -110,33 +110,11 @@ class PldRx(Struct):
# TODO: better to bind it here?
# _rx_mc: trio.MemoryReceiveChannel
_pld_dec: MsgDec
_ctx: Context|None = None
_ipc: Context|MsgStream|None = None
@property
def pld_dec(self) -> MsgDec:
return self._pld_dec
# TODO: a better name?
# -[ ] when would this be used as it avoids needingn to pass the
# ipc prim to every method
@cm
def wraps_ipc(
self,
ipc_prim: Context|MsgStream,
) -> PldRx:
'''
Apply this payload receiver to an IPC primitive type, one
of `Context` or `MsgStream`.
'''
self._ipc = ipc_prim
try:
yield self
finally:
self._ipc = None
@cm
def limit_plds(
self,
@ -169,7 +147,7 @@ class PldRx(Struct):
def dec(self) -> msgpack.Decoder:
return self._pld_dec.dec
def recv_pld_nowait(
def recv_msg_nowait(
self,
# TODO: make this `MsgStream` compat as well, see above^
# ipc_prim: Context|MsgStream,
@ -180,7 +158,15 @@ class PldRx(Struct):
hide_tb: bool = False,
**dec_pld_kwargs,
) -> Any|Raw:
) -> tuple[
MsgType[PayloadT],
PayloadT,
]:
'''
Attempt to non-blocking receive a message from the `._rx_chan` and
unwrap it's payload delivering the pair to the caller.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = (
@ -189,31 +175,78 @@ class PldRx(Struct):
# sync-rx msg from underlying IPC feeder (mem-)chan
ipc._rx_chan.receive_nowait()
)
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
return self.decode_pld(
pld: PayloadT = self.decode_pld(
msg,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**dec_pld_kwargs,
)
return (
msg,
pld,
)
async def recv_msg(
self,
ipc: Context|MsgStream,
expect_msg: MsgType,
# NOTE: ONLY for handling `Stop`-msgs that arrive during
# a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
hide_tb: bool = True,
**decode_pld_kwargs,
) -> tuple[MsgType, PayloadT]:
'''
Retrieve the next avail IPC msg, decode its payload, and
return the (msg, pld) pair.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive()
match msg:
case Return()|Error():
log.runtime(
f'Rxed final outcome msg\n'
f'{msg}\n'
)
case Stop():
log.runtime(
f'Rxed stream stopped msg\n'
f'{msg}\n'
)
if passthrough_non_pld_msgs:
return msg, None
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
pld: PayloadT = self.decode_pld(
msg,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**decode_pld_kwargs,
)
return (
msg,
pld,
)
async def recv_pld(
self,
ipc: Context|MsgStream,
ipc_msg: MsgType|None = None,
ipc_msg: MsgType[PayloadT]|None = None,
expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True,
**dec_pld_kwargs,
) -> Any|Raw:
) -> PayloadT:
'''
Receive a `MsgType`, then decode and return its `.pld` field.
@ -420,54 +453,6 @@ class PldRx(Struct):
__tracebackhide__: bool = False
raise
async def recv_msg_w_pld(
self,
ipc: Context|MsgStream,
expect_msg: MsgType,
# NOTE: generally speaking only for handling `Stop`-msgs that
# arrive during a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
hide_tb: bool = True,
**kwargs,
) -> tuple[MsgType, PayloadT]:
'''
Retrieve the next avail IPC msg, decode it's payload, and
return the pair of refs.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive()
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
if passthrough_non_pld_msgs:
match msg:
case Stop():
return msg, None
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
pld: PayloadT = self.decode_pld(
msg,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**kwargs,
)
# log.runtime(
# f'Delivering payload msg\n'
# f'{msg}\n'
# )
return msg, pld
@cm
def limit_plds(
@ -607,7 +592,7 @@ async def drain_to_final_msg(
# receive all msgs, scanning for either a final result
# or error; the underlying call should never raise any
# remote error directly!
msg, pld = await ctx._pld_rx.recv_msg_w_pld(
msg, pld = await ctx._pld_rx.recv_msg(
ipc=ctx,
expect_msg=Return,
raise_error=False,