From 18e97a8f9ae88000bfaa29b713c7b282006d363f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Apr 2024 12:31:05 -0400 Subject: [PATCH] Use `Context._stream` in `_raise_from_unexpected_msg()` Instead of expecting it to be passed in (as it was prior), when determining if a `Stop` msg is a valid end-of-channel signal use the `ctx._stream: MsgStream|None` attr which **must** be set by any stream opening API; either of: - `Context.open_stream()` - `Portal.open_stream_from()` Adjust the case block logic to match with fallthrough from any EoC to a closed error if necessary. Change the `_type: str` to match the failing IPC-prim name in the tail case we raise a `MessagingError`. Other: - move `.sender: tuple` uid attr up to `RemoteActorError` since `Error` optionally defines it as a field and for boxed `StreamOverrun`s (an ignore case we check for in the runtime during cancellation) we want it readable from the boxing rae. - drop still unused `InternalActorError`. --- tractor/_exceptions.py | 107 +++++++++++++++++++---------------------- 1 file changed, 49 insertions(+), 58 deletions(-) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index b2ba6e8..8d9274f 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -532,7 +532,8 @@ class RemoteActorError(Exception): self, ) -> BaseException: ''' - Unpack the inner-most source error from it's original IPC msg data. + Unpack the inner-most source error from it's original IPC + msg data. We attempt to reconstruct (as best as we can) the original `Exception` from as it would have been raised in the @@ -570,6 +571,14 @@ class RemoteActorError(Exception): # # boxed_type=get_type_ref(.. # raise NotImplementedError + @property + def sender(self) -> tuple[str, str]|None: + if ( + (msg := self._ipc_msg) + and (value := msg.sender) + ): + return tuple(value) + class ContextCancelled(RemoteActorError): ''' @@ -734,20 +743,6 @@ class StreamOverrun( handled by app code using `MsgStream.send()/.receive()`. ''' - @property - def sender(self) -> tuple[str, str] | None: - value = self._ipc_msg.sender - if value: - return tuple(value) - - -# class InternalActorError(RemoteActorError): -# ''' -# Boxed (Remote) internal `tractor` error indicating failure of some -# primitive, machinery state or lowlevel task that should never -# occur. - -# ''' class TransportClosed(trio.ClosedResourceError): @@ -945,7 +940,6 @@ def _raise_from_unexpected_msg( log: StackLevelAdapter, # caller specific `log` obj expect_msg: str = Yield, - stream: MsgStream | None = None, # allow "deeper" tbs when debugging B^o hide_tb: bool = True, @@ -987,6 +981,8 @@ def _raise_from_unexpected_msg( ) from src_err # TODO: test that shows stream raising an expected error!!! + stream: MsgStream|None + _type: str = 'Context' # raise the error message in a boxed exception type! if isinstance(msg, Error): @@ -1003,55 +999,50 @@ def _raise_from_unexpected_msg( # TODO: does it make more sense to pack # the stream._eoc outside this in the calleer always? # case Stop(): - elif ( - isinstance(msg, Stop) - or ( - stream - and stream._eoc - ) - ): - log.debug( - f'Context[{cid}] stream was stopped by remote side\n' - f'cid: {cid}\n' - ) + elif stream := ctx._stream: + _type: str = 'MsgStream' - # TODO: if the a local task is already blocking on - # a `Context.result()` and thus a `.receive()` on the - # rx-chan, we close the chan and set state ensuring that - # an eoc is raised! + if ( + stream._eoc + or + isinstance(msg, Stop) + ): + log.debug( + f'Context[{cid}] stream was stopped by remote side\n' + f'cid: {cid}\n' + ) - # XXX: this causes ``ReceiveChannel.__anext__()`` to - # raise a ``StopAsyncIteration`` **and** in our catch - # block below it will trigger ``.aclose()``. - eoc = trio.EndOfChannel( - f'Context stream ended due to msg:\n\n' - f'{pformat(msg)}\n' - ) - # XXX: important to set so that a new `.receive()` - # call (likely by another task using a broadcast receiver) - # doesn't accidentally pull the `return` message - # value out of the underlying feed mem chan which is - # destined for the `Context.result()` call during ctx-exit! - stream._eoc: Exception = eoc + # TODO: if the a local task is already blocking on + # a `Context.result()` and thus a `.receive()` on the + # rx-chan, we close the chan and set state ensuring that + # an eoc is raised! - # in case there already is some underlying remote error - # that arrived which is probably the source of this stream - # closure - ctx.maybe_raise() - raise eoc from src_err + # XXX: this causes ``ReceiveChannel.__anext__()`` to + # raise a ``StopAsyncIteration`` **and** in our catch + # block below it will trigger ``.aclose()``. + eoc = trio.EndOfChannel( + f'Context stream ended due to msg:\n\n' + f'{pformat(msg)}\n' + ) + # XXX: important to set so that a new `.receive()` + # call (likely by another task using a broadcast receiver) + # doesn't accidentally pull the `return` message + # value out of the underlying feed mem chan which is + # destined for the `Context.result()` call during ctx-exit! + stream._eoc: Exception = eoc - if ( - stream - and stream._closed - ): - # TODO: our own error subtype? - raise trio.ClosedResourceError( - 'This stream was closed' - ) + # in case there already is some underlying remote error + # that arrived which is probably the source of this stream + # closure + ctx.maybe_raise() + raise eoc from src_err + + if stream._closed: + # TODO: our own error subtype? + raise trio.ClosedResourceError('This stream was closed') # always re-raise the source error if no translation error case # is activated above. - _type: str = 'Stream' if stream else 'Context' raise MessagingError( f"{_type} was expecting a {expect_msg} message" " BUT received a non-error msg:\n"