Add `Context._outcome_msg` use new `PldRx` API

Such that any `Return` is always capture for each ctx instance and set
in `._deliver_msg()` normally; ensures we can at least introspect for it
when missing (like in a recently discovered stream teardown race bug).
Yes this augments the already existing `._result` which is dedicated for
the `._outcome_msg.pld` in the non-error case; we might want to see if
there's a nicer way to directly proxy ref to that without getting the
pre-pld-decoded `Raw` form with `msgspec`?

Also use the new `ctx._pld_rx.recv_msg()` and drop assigning
`pld_rx._ctx`.
Tyler Goodlet 2025-03-12 15:03:55 -04:00
parent debe63f4f2
commit c2d1bbe05e
2 changed files with 40 additions and 10 deletions

View File

@ -82,6 +82,7 @@ from .msg import (
MsgType, MsgType,
NamespacePath, NamespacePath,
PayloadT, PayloadT,
Return,
Started, Started,
Stop, Stop,
Yield, Yield,
@ -245,11 +246,13 @@ class Context:
# a drain loop? # a drain loop?
# _res_scope: trio.CancelScope|None = None # _res_scope: trio.CancelScope|None = None
_outcome_msg: Return|Error|ContextCancelled = Unresolved
# on a clean exit there should be a final value # on a clean exit there should be a final value
# delivered from the far end "callee" task, so # delivered from the far end "callee" task, so
# this value is only set on one side. # this value is only set on one side.
# _result: Any | int = None # _result: Any | int = None
_result: Any|Unresolved = Unresolved _result: PayloadT|Unresolved = Unresolved
# if the local "caller" task errors this value is always set # if the local "caller" task errors this value is always set
# to the error that was captured in the # to the error that was captured in the
@ -1199,9 +1202,11 @@ class Context:
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
assert self._portal, ( if not self._portal:
'`Context.wait_for_result()` can not be called from callee side!' raise RuntimeError(
) 'Invalid usage of `Context.wait_for_result()`!\n'
'Not valid on child-side IPC ctx!\n'
)
if self._final_result_is_set(): if self._final_result_is_set():
return self._result return self._result
@ -1222,6 +1227,8 @@ class Context:
# since every message should be delivered via the normal # since every message should be delivered via the normal
# `._deliver_msg()` route which will appropriately set # `._deliver_msg()` route which will appropriately set
# any `.maybe_error`. # any `.maybe_error`.
outcome_msg: Return|Error|ContextCancelled
drained_msgs: list[MsgType]
( (
outcome_msg, outcome_msg,
drained_msgs, drained_msgs,
@ -1229,11 +1236,19 @@ class Context:
ctx=self, ctx=self,
hide_tb=hide_tb, hide_tb=hide_tb,
) )
drained_status: str = ( drained_status: str = (
'Ctx drained to final outcome msg\n\n' 'Ctx drained to final outcome msg\n\n'
f'{outcome_msg}\n' f'{outcome_msg}\n'
) )
# ?XXX, should already be set in `._deliver_msg()` right?
if self._outcome_msg is not Unresolved:
# from .devx import _debug
# await _debug.pause()
assert self._outcome_msg is outcome_msg
else:
self._outcome_msg = outcome_msg
if drained_msgs: if drained_msgs:
drained_status += ( drained_status += (
'\n' '\n'
@ -1741,7 +1756,6 @@ class Context:
f'{structfmt(msg)}\n' f'{structfmt(msg)}\n'
) )
# NOTE: if an error is deteced we should always still # NOTE: if an error is deteced we should always still
# send it through the feeder-mem-chan and expect # send it through the feeder-mem-chan and expect
# it to be raised by any context (stream) consumer # it to be raised by any context (stream) consumer
@ -1753,6 +1767,21 @@ class Context:
# normally the task that should get cancelled/error # normally the task that should get cancelled/error
# from some remote fault! # from some remote fault!
send_chan.send_nowait(msg) send_chan.send_nowait(msg)
match msg:
case Stop():
if (stream := self._stream):
stream._stop_msg = msg
case Return():
if not self._outcome_msg:
log.warning(
f'Setting final outcome msg AFTER '
f'`._rx_chan.send()`??\n'
f'\n'
f'{msg}'
)
self._outcome_msg = msg
return True return True
except trio.BrokenResourceError: except trio.BrokenResourceError:
@ -2009,7 +2038,7 @@ async def open_context_from_portal(
# the dialog, the `Error` msg should be raised from the `msg` # the dialog, the `Error` msg should be raised from the `msg`
# handling block below. # handling block below.
try: try:
started_msg, first = await ctx._pld_rx.recv_msg_w_pld( started_msg, first = await ctx._pld_rx.recv_msg(
ipc=ctx, ipc=ctx,
expect_msg=Started, expect_msg=Started,
passthrough_non_pld_msgs=False, passthrough_non_pld_msgs=False,
@ -2374,7 +2403,8 @@ async def open_context_from_portal(
# displaying `ContextCancelled` traces where the # displaying `ContextCancelled` traces where the
# cause of crash/exit IS due to something in # cause of crash/exit IS due to something in
# user/app code on either end of the context. # user/app code on either end of the context.
and not rxchan._closed and
not rxchan._closed
): ):
# XXX NOTE XXX: and again as per above, we mask any # XXX NOTE XXX: and again as per above, we mask any
# `trio.Cancelled` raised here so as to NOT mask # `trio.Cancelled` raised here so as to NOT mask
@ -2433,6 +2463,7 @@ async def open_context_from_portal(
# FINALLY, remove the context from runtime tracking and # FINALLY, remove the context from runtime tracking and
# exit! # exit!
log.runtime( log.runtime(
# log.cancel(
f'De-allocating IPC ctx opened with {ctx.side!r} peer \n' f'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
f'uid: {uid}\n' f'uid: {uid}\n'
f'cid: {ctx.cid}\n' f'cid: {ctx.cid}\n'
@ -2488,7 +2519,6 @@ def mk_context(
_caller_info=caller_info, _caller_info=caller_info,
**kwargs, **kwargs,
) )
pld_rx._ctx = ctx
ctx._result = Unresolved ctx._result = Unresolved
return ctx return ctx

View File

@ -184,7 +184,7 @@ class Portal:
( (
self._final_result_msg, self._final_result_msg,
self._final_result_pld, self._final_result_pld,
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld( ) = await self._expect_result_ctx._pld_rx.recv_msg(
ipc=self._expect_result_ctx, ipc=self._expect_result_ctx,
expect_msg=Return, expect_msg=Return,
) )