From c2d1bbe05ecd4caecd36fe3521c28bf0d3b9de2d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Mar 2025 15:03:55 -0400 Subject: [PATCH] Add `Context._outcome_msg` use new `PldRx` API Such that any `Return` is always capture for each ctx instance and set in `._deliver_msg()` normally; ensures we can at least introspect for it when missing (like in a recently discovered stream teardown race bug). Yes this augments the already existing `._result` which is dedicated for the `._outcome_msg.pld` in the non-error case; we might want to see if there's a nicer way to directly proxy ref to that without getting the pre-pld-decoded `Raw` form with `msgspec`? Also use the new `ctx._pld_rx.recv_msg()` and drop assigning `pld_rx._ctx`. --- tractor/_context.py | 48 ++++++++++++++++++++++++++++++++++++--------- tractor/_portal.py | 2 +- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index 5d6ccf69..201e920a 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -82,6 +82,7 @@ from .msg import ( MsgType, NamespacePath, PayloadT, + Return, Started, Stop, Yield, @@ -245,11 +246,13 @@ class Context: # a drain loop? # _res_scope: trio.CancelScope|None = None + _outcome_msg: Return|Error|ContextCancelled = Unresolved + # on a clean exit there should be a final value # delivered from the far end "callee" task, so # this value is only set on one side. # _result: Any | int = None - _result: Any|Unresolved = Unresolved + _result: PayloadT|Unresolved = Unresolved # if the local "caller" task errors this value is always set # to the error that was captured in the @@ -1199,9 +1202,11 @@ class Context: ''' __tracebackhide__: bool = hide_tb - assert self._portal, ( - '`Context.wait_for_result()` can not be called from callee side!' - ) + if not self._portal: + raise RuntimeError( + 'Invalid usage of `Context.wait_for_result()`!\n' + 'Not valid on child-side IPC ctx!\n' + ) if self._final_result_is_set(): return self._result @@ -1222,6 +1227,8 @@ class Context: # since every message should be delivered via the normal # `._deliver_msg()` route which will appropriately set # any `.maybe_error`. + outcome_msg: Return|Error|ContextCancelled + drained_msgs: list[MsgType] ( outcome_msg, drained_msgs, @@ -1229,11 +1236,19 @@ class Context: ctx=self, hide_tb=hide_tb, ) - drained_status: str = ( 'Ctx drained to final outcome msg\n\n' f'{outcome_msg}\n' ) + + # ?XXX, should already be set in `._deliver_msg()` right? + if self._outcome_msg is not Unresolved: + # from .devx import _debug + # await _debug.pause() + assert self._outcome_msg is outcome_msg + else: + self._outcome_msg = outcome_msg + if drained_msgs: drained_status += ( '\n' @@ -1741,7 +1756,6 @@ class Context: f'{structfmt(msg)}\n' ) - # NOTE: if an error is deteced we should always still # send it through the feeder-mem-chan and expect # it to be raised by any context (stream) consumer @@ -1753,6 +1767,21 @@ class Context: # normally the task that should get cancelled/error # from some remote fault! send_chan.send_nowait(msg) + match msg: + case Stop(): + if (stream := self._stream): + stream._stop_msg = msg + + case Return(): + if not self._outcome_msg: + log.warning( + f'Setting final outcome msg AFTER ' + f'`._rx_chan.send()`??\n' + f'\n' + f'{msg}' + ) + self._outcome_msg = msg + return True except trio.BrokenResourceError: @@ -2009,7 +2038,7 @@ async def open_context_from_portal( # the dialog, the `Error` msg should be raised from the `msg` # handling block below. try: - started_msg, first = await ctx._pld_rx.recv_msg_w_pld( + started_msg, first = await ctx._pld_rx.recv_msg( ipc=ctx, expect_msg=Started, passthrough_non_pld_msgs=False, @@ -2374,7 +2403,8 @@ async def open_context_from_portal( # displaying `ContextCancelled` traces where the # cause of crash/exit IS due to something in # user/app code on either end of the context. - and not rxchan._closed + and + not rxchan._closed ): # XXX NOTE XXX: and again as per above, we mask any # `trio.Cancelled` raised here so as to NOT mask @@ -2433,6 +2463,7 @@ async def open_context_from_portal( # FINALLY, remove the context from runtime tracking and # exit! log.runtime( + # log.cancel( f'De-allocating IPC ctx opened with {ctx.side!r} peer \n' f'uid: {uid}\n' f'cid: {ctx.cid}\n' @@ -2488,7 +2519,6 @@ def mk_context( _caller_info=caller_info, **kwargs, ) - pld_rx._ctx = ctx ctx._result = Unresolved return ctx diff --git a/tractor/_portal.py b/tractor/_portal.py index 7fbf69b2..cee10c47 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -184,7 +184,7 @@ class Portal: ( self._final_result_msg, self._final_result_pld, - ) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld( + ) = await self._expect_result_ctx._pld_rx.recv_msg( ipc=self._expect_result_ctx, expect_msg=Return, )