diff --git a/tractor/_context.py b/tractor/_context.py index 5590228..11975ba 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -169,8 +169,7 @@ async def _drain_to_final_msg( # only when we are sure the remote error is # the source cause of this local task's # cancellation. - if re := ctx._remote_error: - ctx._maybe_raise_remote_err(re) + ctx.maybe_raise() # CASE 1: we DID request the cancel we simply # continue to bubble up as normal. @@ -257,6 +256,13 @@ async def _drain_to_final_msg( ) # XXX fallthrough to handle expected error XXX + # TODO: replace this with `ctx.maybe_raise()` + # + # TODO: would this be handier for this case maybe? + # async with maybe_raise_on_exit() as raises: + # if raises: + # log.error('some msg about raising..') + re: Exception|None = ctx._remote_error if re: log.critical( @@ -595,7 +601,7 @@ class Context: if not re: return False - if from_uid := re.src_actor_uid: + if from_uid := re.src_uid: from_uid: tuple = tuple(from_uid) our_uid: tuple = self._actor.uid @@ -825,7 +831,7 @@ class Context: # cancellation. maybe_error_src: tuple = getattr( error, - 'src_actor_uid', + 'src_uid', None, ) self._canceller = ( @@ -1030,8 +1036,8 @@ class Context: @acm async def open_stream( self, - allow_overruns: bool | None = False, - msg_buffer_size: int | None = None, + allow_overruns: bool|None = False, + msg_buffer_size: int|None = None, ) -> AsyncGenerator[MsgStream, None]: ''' @@ -1071,13 +1077,16 @@ class Context: # absorbed there (silently) and we DO NOT want to # actually try to stream - a cancel msg was already # sent to the other side! - if self._remote_error: - # NOTE: this is diff then calling - # `._maybe_raise_remote_err()` specifically - # because any task entering this `.open_stream()` - # AFTER cancellation has already been requested, - # we DO NOT want to absorb any ctxc ACK silently! - raise self._remote_error + self.maybe_raise( + raise_ctxc_from_self_call=True, + ) + # NOTE: this is diff then calling + # `._maybe_raise_remote_err()` specifically + # because we want to raise a ctxc on any task entering this `.open_stream()` + # AFTER cancellation was already been requested, + # we DO NOT want to absorb any ctxc ACK silently! + # if self._remote_error: + # raise self._remote_error # XXX NOTE: if no `ContextCancelled` has been responded # back from the other side (yet), we raise a different @@ -1158,7 +1167,6 @@ class Context: # await trio.lowlevel.checkpoint() yield stream - # XXX: (MEGA IMPORTANT) if this is a root opened process we # wait for any immediate child in debug before popping the # context from the runtime msg loop otherwise inside @@ -1183,12 +1191,23 @@ class Context: # # await stream.aclose() - # if re := ctx._remote_error: - # ctx._maybe_raise_remote_err( - # re, - # raise_ctxc_from_self_call=True, - # ) - # await trio.lowlevel.checkpoint() + # NOTE: absorb and do not raise any + # EoC received from the other side such that + # it is not raised inside the surrounding + # context block's scope! + except trio.EndOfChannel as eoc: + if ( + eoc + and stream.closed + ): + # sanity, can remove? + assert eoc is stream._eoc + # from .devx import pause + # await pause() + log.warning( + 'Stream was terminated by EoC\n\n' + f'{repr(eoc)}\n' + ) finally: if self._portal: @@ -1204,7 +1223,6 @@ class Context: # TODO: replace all the instances of this!! XD def maybe_raise( self, - hide_tb: bool = True, **kwargs, @@ -1388,33 +1406,41 @@ class Context: f'{drained_msgs}' ) - if ( - (re := self._remote_error) - # and self._result == res_placeholder - ): - self._maybe_raise_remote_err( - re, - # NOTE: obvi we don't care if we - # overran the far end if we're already - # waiting on a final result (msg). - # raise_overrun_from_self=False, - raise_overrun_from_self=( - raise_overrun - and - # only when we ARE NOT the canceller - # should we raise overruns, bc ow we're - # raising something we know might happen - # during cancellation ;) - (not self._cancel_called) - ), + self.maybe_raise( + raise_overrun_from_self=( + raise_overrun + and + # only when we ARE NOT the canceller + # should we raise overruns, bc ow we're + # raising something we know might happen + # during cancellation ;) + (not self._cancel_called) ) + ) + # if ( + # (re := self._remote_error) + # # and self._result == res_placeholder + # ): + # self._maybe_raise_remote_err( + # re, + # # NOTE: obvi we don't care if we + # # overran the far end if we're already + # # waiting on a final result (msg). + # # raise_overrun_from_self=False, + # raise_overrun_from_self=( + # raise_overrun + # and + # # only when we ARE NOT the canceller + # # should we raise overruns, bc ow we're + # # raising something we know might happen + # # during cancellation ;) + # (not self._cancel_called) + # ), + # ) # if maybe_err: # self._result = maybe_err return self.outcome - # None if self._result == res_placeholder - # else self._result - # ) # TODO: switch this with above which should be named # `.wait_for_outcome()` and instead do @@ -1863,8 +1889,9 @@ async def open_context_from_portal( # TODO: if we set this the wrapping `@acm` body will # still be shown (awkwardly) on pdb REPL entry. Ideally - # we can similarly annotate that frame to NOT show? - hide_tb: bool = True, + # we can similarly annotate that frame to NOT show? for now + # we DO SHOW this frame since it's awkward ow.. + hide_tb: bool = False, # proxied to RPC **kwargs, diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 149bb35..e0015fe 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -136,7 +136,7 @@ class MsgStream(trio.abc.Channel): # return await self.receive() # except trio.EndOfChannel: # raise StopAsyncIteration - + # # see ``.aclose()`` for notes on the old behaviour prior to # introducing this if self._eoc: @@ -152,7 +152,6 @@ class MsgStream(trio.abc.Channel): return msg['yield'] except KeyError as kerr: - # log.exception('GOT KEYERROR') src_err = kerr # NOTE: may raise any of the below error types @@ -166,30 +165,20 @@ class MsgStream(trio.abc.Channel): stream=self, ) - # XXX: we close the stream on any of these error conditions: + # XXX: the stream terminates on either of: + # - via `self._rx_chan.receive()` raising after manual closure + # by the rpc-runtime OR, + # - via a received `{'stop': ...}` msg from remote side. + # |_ NOTE: previously this was triggered by calling + # ``._rx_chan.aclose()`` on the send side of the channel inside + # `Actor._push_result()`, but now the 'stop' message handling + # has been put just above inside `_raise_from_no_key_in_msg()`. except ( - # trio.ClosedResourceError, # by self._rx_chan - trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end + trio.EndOfChannel, ) as eoc: - # log.exception('GOT EOC') src_err = eoc self._eoc = eoc - # a ``ClosedResourceError`` indicates that the internal - # feeder memory receive channel was closed likely by the - # runtime after the associated transport-channel - # disconnected or broke. - - # an ``EndOfChannel`` indicates either the internal recv - # memchan exhausted **or** we raisesd it just above after - # receiving a `stop` message from the far end of the stream. - - # Previously this was triggered by calling ``.aclose()`` on - # the send side of the channel inside - # ``Actor._push_result()`` (should still be commented code - # there - which should eventually get removed), but now the - # 'stop' message handling has been put just above. - # TODO: Locally, we want to close this stream gracefully, by # terminating any local consumers tasks deterministically. # Once we have broadcast support, we **don't** want to be @@ -210,8 +199,11 @@ class MsgStream(trio.abc.Channel): # raise eoc - except trio.ClosedResourceError as cre: # by self._rx_chan - # log.exception('GOT CRE') + # a ``ClosedResourceError`` indicates that the internal + # feeder memory receive channel was closed likely by the + # runtime after the associated transport-channel + # disconnected or broke. + except trio.ClosedResourceError as cre: # by self._rx_chan.receive() src_err = cre log.warning( '`Context._rx_chan` was already closed?' @@ -237,15 +229,30 @@ class MsgStream(trio.abc.Channel): # over the end-of-stream connection error since likely # the remote error was the source cause? ctx: Context = self._ctx - if re := ctx._remote_error: - ctx._maybe_raise_remote_err( - re, - raise_ctxc_from_self_call=True, - ) + ctx.maybe_raise( + raise_ctxc_from_self_call=True, + ) - # propagate any error but hide low-level frames from - # caller by default. - if hide_tb: + # propagate any error but hide low-level frame details + # from the caller by default for debug noise reduction. + if ( + hide_tb + + # XXX NOTE XXX don't reraise on certain + # stream-specific internal error types like, + # + # - `trio.EoC` since we want to use the exact instance + # to ensure that it is the error that bubbles upward + # for silent absorption by `Context.open_stream()`. + and not self._eoc + + # - `RemoteActorError` (or `ContextCancelled`) if it gets + # raised from `_raise_from_no_key_in_msg()` since we + # want the same (as the above bullet) for any + # `.open_context()` block bubbled error raised by + # any nearby ctx API remote-failures. + # and not isinstance(src_err, RemoteActorError) + ): raise type(src_err)(*src_err.args) from src_err else: raise src_err @@ -370,6 +377,10 @@ class MsgStream(trio.abc.Channel): # await rx_chan.aclose() if not self._eoc: + log.cancel( + 'Stream closed before it received an EoC?\n' + 'Setting eoc manually..\n..' + ) self._eoc: bool = trio.EndOfChannel( f'Context stream closed by {self._ctx.side}\n' f'|_{self}\n' @@ -414,13 +425,11 @@ class MsgStream(trio.abc.Channel): @property def closed(self) -> bool: - if ( - (rxc := self._rx_chan._closed) - or - (_closed := self._closed) - or - (_eoc := self._eoc) - ): + + rxc: bool = self._rx_chan._closed + _closed: bool|Exception = self._closed + _eoc: bool|trio.EndOfChannel = self._eoc + if rxc or _closed or _eoc: log.runtime( f'`MsgStream` is already closed\n' f'{self}\n' @@ -496,7 +505,11 @@ class MsgStream(trio.abc.Channel): ''' __tracebackhide__: bool = hide_tb + # raise any alreay known error immediately self._ctx.maybe_raise() + if self._eoc: + raise self._eoc + if self._closed: raise self._closed