Use `Context._stream` in `_raise_from_unexpected_msg()`

Instead of expecting it to be passed in (as it was prior), when
determining if a `Stop` msg is a valid end-of-channel signal use the
`ctx._stream: MsgStream|None` attr which **must** be set by any stream
opening API; either of:
- `Context.open_stream()`
- `Portal.open_stream_from()`

Adjust the case block logic to match with fallthrough from any EoC to
a closed error if necessary. Change the `_type: str` to match the
failing IPC-prim name in the tail case we raise a `MessagingError`.

Other:
- move `.sender: tuple` uid attr up to `RemoteActorError` since `Error`
  optionally defines it as a field and for boxed `StreamOverrun`s (an
  ignore case we check for in the runtime during cancellation) we want
  it readable from the boxing rae.
- drop still unused `InternalActorError`.
runtime_to_msgspec
Tyler Goodlet 2024-04-24 12:31:05 -04:00
parent 5eb9144921
commit 18e97a8f9a
1 changed files with 49 additions and 58 deletions

View File

@ -532,7 +532,8 @@ class RemoteActorError(Exception):
self,
) -> BaseException:
'''
Unpack the inner-most source error from it's original IPC msg data.
Unpack the inner-most source error from it's original IPC
msg data.
We attempt to reconstruct (as best as we can) the original
`Exception` from as it would have been raised in the
@ -570,6 +571,14 @@ class RemoteActorError(Exception):
# # boxed_type=get_type_ref(..
# raise NotImplementedError
@property
def sender(self) -> tuple[str, str]|None:
if (
(msg := self._ipc_msg)
and (value := msg.sender)
):
return tuple(value)
class ContextCancelled(RemoteActorError):
'''
@ -734,20 +743,6 @@ class StreamOverrun(
handled by app code using `MsgStream.send()/.receive()`.
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self._ipc_msg.sender
if value:
return tuple(value)
# class InternalActorError(RemoteActorError):
# '''
# Boxed (Remote) internal `tractor` error indicating failure of some
# primitive, machinery state or lowlevel task that should never
# occur.
# '''
class TransportClosed(trio.ClosedResourceError):
@ -945,7 +940,6 @@ def _raise_from_unexpected_msg(
log: StackLevelAdapter, # caller specific `log` obj
expect_msg: str = Yield,
stream: MsgStream | None = None,
# allow "deeper" tbs when debugging B^o
hide_tb: bool = True,
@ -987,6 +981,8 @@ def _raise_from_unexpected_msg(
) from src_err
# TODO: test that shows stream raising an expected error!!!
stream: MsgStream|None
_type: str = 'Context'
# raise the error message in a boxed exception type!
if isinstance(msg, Error):
@ -1003,55 +999,50 @@ def _raise_from_unexpected_msg(
# TODO: does it make more sense to pack
# the stream._eoc outside this in the calleer always?
# case Stop():
elif (
isinstance(msg, Stop)
or (
stream
and stream._eoc
)
):
log.debug(
f'Context[{cid}] stream was stopped by remote side\n'
f'cid: {cid}\n'
)
elif stream := ctx._stream:
_type: str = 'MsgStream'
# TODO: if the a local task is already blocking on
# a `Context.result()` and thus a `.receive()` on the
# rx-chan, we close the chan and set state ensuring that
# an eoc is raised!
if (
stream._eoc
or
isinstance(msg, Stop)
):
log.debug(
f'Context[{cid}] stream was stopped by remote side\n'
f'cid: {cid}\n'
)
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
eoc = trio.EndOfChannel(
f'Context stream ended due to msg:\n\n'
f'{pformat(msg)}\n'
)
# XXX: important to set so that a new `.receive()`
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the `return` message
# value out of the underlying feed mem chan which is
# destined for the `Context.result()` call during ctx-exit!
stream._eoc: Exception = eoc
# TODO: if the a local task is already blocking on
# a `Context.result()` and thus a `.receive()` on the
# rx-chan, we close the chan and set state ensuring that
# an eoc is raised!
# in case there already is some underlying remote error
# that arrived which is probably the source of this stream
# closure
ctx.maybe_raise()
raise eoc from src_err
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
eoc = trio.EndOfChannel(
f'Context stream ended due to msg:\n\n'
f'{pformat(msg)}\n'
)
# XXX: important to set so that a new `.receive()`
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the `return` message
# value out of the underlying feed mem chan which is
# destined for the `Context.result()` call during ctx-exit!
stream._eoc: Exception = eoc
if (
stream
and stream._closed
):
# TODO: our own error subtype?
raise trio.ClosedResourceError(
'This stream was closed'
)
# in case there already is some underlying remote error
# that arrived which is probably the source of this stream
# closure
ctx.maybe_raise()
raise eoc from src_err
if stream._closed:
# TODO: our own error subtype?
raise trio.ClosedResourceError('This stream was closed')
# always re-raise the source error if no translation error case
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f"{_type} was expecting a {expect_msg} message"
" BUT received a non-error msg:\n"