From 51f72801d2b823924bc01482948702268f9aaec7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Mar 2025 16:24:39 -0400 Subject: [PATCH] Add `MsgStream._stop_msg` use new `PldRx` API In particular ensuring we use `ctx._pld_rx.recv_msg_nowait()` from `.receive_nowait()` (which is called from `.aclose()`) such that we ALWAYS (can) set the surrounding `Context._result/._outcome_msg` attrs on reception of a final `Return`!! This fixes a final stream-teardown-race-condition-bug where prior we normally didn't set the `Context._result/._outcome_msg` in such cases. This is **precisely because** `.receive_nowait()` only returns the `pld` and when called from `.aclose()` this value is discarded, meaning so is its boxing `Return` despite consuming it from the underlying `._rx_chan`.. Longer term this should be solved differently by ensuring such races cases are handled at a higher scope like inside `Context._deliver_msg()` or the `Portal.open_context()` enter/exit blocks? Add a detailed warning note and todos for all this around the special case block! --- tractor/_streaming.py | 118 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 95 insertions(+), 23 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 58e9b069..2ff2d41c 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -45,9 +45,11 @@ from .trionics import ( BroadcastReceiver, ) from tractor.msg import ( - # Return, - # Stop, + Error, + Return, + Stop, MsgType, + PayloadT, Yield, ) @@ -70,8 +72,7 @@ class MsgStream(trio.abc.Channel): A bidirectional message stream for receiving logically sequenced values over an inter-actor IPC `Channel`. - This is the type returned to a local task which entered either - `Portal.open_stream_from()` or `Context.open_stream()`. + Termination rules: @@ -94,6 +95,9 @@ class MsgStream(trio.abc.Channel): self._rx_chan = rx_chan self._broadcaster = _broadcaster + # any actual IPC msg which is effectively an `EndOfStream` + self._stop_msg: bool|Stop = False + # flag to denote end of stream self._eoc: bool|trio.EndOfChannel = False self._closed: bool|trio.ClosedResourceError = False @@ -125,16 +129,67 @@ class MsgStream(trio.abc.Channel): def receive_nowait( self, expect_msg: MsgType = Yield, - ): + ) -> PayloadT: ctx: Context = self._ctx - return ctx._pld_rx.recv_pld_nowait( + ( + msg, + pld, + ) = ctx._pld_rx.recv_msg_nowait( ipc=self, expect_msg=expect_msg, ) + # ?TODO, maybe factor this into a hyper-common `unwrap_pld()` + # + match msg: + + # XXX, these never seems to ever hit? cool? + case Stop(): + log.cancel( + f'Msg-stream was ended via stop msg\n' + f'{msg}' + ) + case Error(): + log.error( + f'Msg-stream was ended via error msg\n' + f'{msg}' + ) + + # XXX NOTE, always set any final result on the ctx to + # avoid teardown race conditions where previously this msg + # would be consumed silently (by `.aclose()` doing its + # own "msg drain loop" but WITHOUT those `drained: lists[MsgType]` + # being post-close-processed! + # + # !!TODO, see the equiv todo-comment in `.receive()` + # around the `if drained:` where we should prolly + # ACTUALLY be doing this post-close processing?? + # + case Return(pld=pld): + log.warning( + f'Msg-stream final result msg for IPC ctx?\n' + f'{msg}' + ) + # XXX TODO, this **should be covered** by higher + # scoped runtime-side method calls such as + # `Context._deliver_msg()`, so you should never + # really see the warning above or else something + # racy/out-of-order is likely going on between + # actor-runtime-side push tasks and the user-app-side + # consume tasks! + # -[ ] figure out that set of race cases and fix! + # -[ ] possibly return the `msg` given an input + # arg-flag is set so we can process the `Return` + # from the `.aclose()` caller? + # + # breakpoint() # to debug this RACE CASE! + ctx._result = pld + ctx._outcome_msg = msg + + return pld + async def receive( self, - hide_tb: bool = False, ): ''' @@ -154,7 +209,7 @@ class MsgStream(trio.abc.Channel): # except trio.EndOfChannel: # raise StopAsyncIteration # - # see ``.aclose()`` for notes on the old behaviour prior to + # see `.aclose()` for notes on the old behaviour prior to # introducing this if self._eoc: raise self._eoc @@ -165,7 +220,11 @@ class MsgStream(trio.abc.Channel): src_err: Exception|None = None # orig tb try: ctx: Context = self._ctx - return await ctx._pld_rx.recv_pld(ipc=self) + pld = await ctx._pld_rx.recv_pld( + ipc=self, + expect_msg=Yield, + ) + return pld # XXX: the stream terminates on either of: # - `self._rx_chan.receive()` raising after manual closure @@ -174,7 +233,7 @@ class MsgStream(trio.abc.Channel): # - via a `Stop`-msg received from remote peer task. # NOTE # |_ previously this was triggered by calling - # ``._rx_chan.aclose()`` on the send side of the channel + # `._rx_chan.aclose()` on the send side of the channel # inside `Actor._deliver_ctx_payload()`, but now the 'stop' # message handling gets delegated to `PldRFx.recv_pld()` # internals. @@ -198,11 +257,14 @@ class MsgStream(trio.abc.Channel): # terminated and signal this local iterator to stop drained: list[Exception|dict] = await self.aclose() if drained: - # ?TODO? pass these to the `._ctx._drained_msgs: deque` - # and then iterate them as part of any `.wait_for_result()` call? - # - # from .devx import pause - # await pause() + # ^^^^^^^^TODO? pass these to the `._ctx._drained_msgs: + # deque` and then iterate them as part of any + # `.wait_for_result()` call? + # + # -[ ] move the match-case processing from + # `.receive_nowait()` instead to right here, use it from + # a for msg in drained:` post-proc loop? + # log.warning( 'Drained context msgs during closure\n\n' f'{drained}' @@ -265,9 +327,6 @@ class MsgStream(trio.abc.Channel): - more or less we try to maintain adherance to trio's `.aclose()` semantics: https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose ''' - - # rx_chan = self._rx_chan - # XXX NOTE XXX # it's SUPER IMPORTANT that we ensure we don't DOUBLE # DRAIN msgs on closure so avoid getting stuck handing on @@ -279,15 +338,16 @@ class MsgStream(trio.abc.Channel): # this stream has already been closed so silently succeed as # per ``trio.AsyncResource`` semantics. # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose + # import tractor + # await tractor.pause() return [] ctx: Context = self._ctx drained: list[Exception|dict] = [] while not drained: try: - maybe_final_msg = self.receive_nowait( - # allow_msgs=[Yield, Return], - expect_msg=Yield, + maybe_final_msg: Yield|Return = self.receive_nowait( + expect_msg=Yield|Return, ) if maybe_final_msg: log.debug( @@ -372,8 +432,10 @@ class MsgStream(trio.abc.Channel): # await rx_chan.aclose() if not self._eoc: + this_side: str = self._ctx.side + peer_side: str = self._ctx.peer_side message: str = ( - f'Stream self-closed by {self._ctx.side!r}-side before EoC\n' + f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n' # } bc a stream is a "scope"/msging-phase inside an IPC f'x}}>\n' f' |_{self}\n' @@ -381,9 +443,19 @@ class MsgStream(trio.abc.Channel): log.cancel(message) self._eoc = trio.EndOfChannel(message) + if ( + (rx_chan := self._rx_chan) + and + (stats := rx_chan.statistics()).tasks_waiting_receive + ): + log.cancel( + f'Msg-stream is closing but there is still reader tasks,\n' + f'{stats}\n' + ) + # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? # => NO, DEFINITELY NOT! <= - # if we're a bi-dir ``MsgStream`` BECAUSE this same + # if we're a bi-dir `MsgStream` BECAUSE this same # core-msg-loop mem recv-chan is used to deliver the # potential final result from the surrounding inter-actor # `Context` so we don't want to close it until that