forked from goodboy/tractor
1
0
Fork 0

Reraise RAEs in `MsgStream.receive()`; truncate tbs

To avoid showing lowlevel details of exception handling around the
underlying call to `return await self._ctx._pld_rx.recv_pld(ipc=self)`,
any time a `RemoteActorError` is unpacked (an raised locally) we re-raise
it directly from the captured `src_err` captured so as to present to
the user/app caller-code an exception raised directly from the `.receive()`
frame. This simplifies traceback call-stacks for any `log.exception()`
or `pdb`-REPL output filtering out the lower `PldRx` frames by default.
aio_abandons
Tyler Goodlet 2024-07-02 16:00:26 -04:00
parent 3c5816c977
commit 02812b9f51
1 changed files with 50 additions and 59 deletions

View File

@ -36,8 +36,8 @@ import warnings
import trio import trio
from ._exceptions import ( from ._exceptions import (
# _raise_from_no_key_in_msg,
ContextCancelled, ContextCancelled,
RemoteActorError,
) )
from .log import get_logger from .log import get_logger
from .trionics import ( from .trionics import (
@ -101,7 +101,7 @@ class MsgStream(trio.abc.Channel):
@property @property
def ctx(self) -> Context: def ctx(self) -> Context:
''' '''
This stream's IPC `Context` ref. A read-only ref to this stream's inter-actor-task `Context`.
''' '''
return self._ctx return self._ctx
@ -145,9 +145,8 @@ class MsgStream(trio.abc.Channel):
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# NOTE: `trio.ReceiveChannel` implements # NOTE FYI: `trio.ReceiveChannel` implements EOC handling as
# EOC handling as follows (aka uses it # follows (aka uses it to gracefully exit async for loops):
# to gracefully exit async for loops):
# #
# async def __anext__(self) -> ReceiveType: # async def __anext__(self) -> ReceiveType:
# try: # try:
@ -165,48 +164,29 @@ class MsgStream(trio.abc.Channel):
src_err: Exception|None = None # orig tb src_err: Exception|None = None # orig tb
try: try:
ctx: Context = self._ctx ctx: Context = self._ctx
return await ctx._pld_rx.recv_pld(ipc=self) return await ctx._pld_rx.recv_pld(ipc=self)
# XXX: the stream terminates on either of: # XXX: the stream terminates on either of:
# - via `self._rx_chan.receive()` raising after manual closure # - `self._rx_chan.receive()` raising after manual closure
# by the rpc-runtime OR, # by the rpc-runtime,
# - via a received `{'stop': ...}` msg from remote side. # OR
# |_ NOTE: previously this was triggered by calling # - via a `Stop`-msg received from remote peer task.
# ``._rx_chan.aclose()`` on the send side of the channel inside # NOTE
# `Actor._deliver_ctx_payload()`, but now the 'stop' message handling # |_ previously this was triggered by calling
# has been put just above inside `_raise_from_no_key_in_msg()`. # ``._rx_chan.aclose()`` on the send side of the channel
except ( # inside `Actor._deliver_ctx_payload()`, but now the 'stop'
trio.EndOfChannel, # message handling gets delegated to `PldRFx.recv_pld()`
) as eoc: # internals.
src_err = eoc except trio.EndOfChannel as eoc:
# a graceful stream finished signal
self._eoc = eoc self._eoc = eoc
src_err = eoc
# TODO: Locally, we want to close this stream gracefully, by # a `ClosedResourceError` indicates that the internal feeder
# terminating any local consumers tasks deterministically. # memory receive channel was closed likely by the runtime
# Once we have broadcast support, we **don't** want to be # after the associated transport-channel disconnected or
# closing this stream and not flushing a final value to # broke.
# remaining (clone) consumers who may not have been
# scheduled to receive it yet.
# try:
# maybe_err_msg_or_res: dict = self._rx_chan.receive_nowait()
# if maybe_err_msg_or_res:
# log.warning(
# 'Discarding un-processed msg:\n'
# f'{maybe_err_msg_or_res}'
# )
# except trio.WouldBlock:
# # no queued msgs that might be another remote
# # error, so just raise the original EoC
# pass
# raise eoc
# a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the
# runtime after the associated transport-channel
# disconnected or broke.
except trio.ClosedResourceError as cre: # by self._rx_chan.receive() except trio.ClosedResourceError as cre: # by self._rx_chan.receive()
src_err = cre src_err = cre
log.warning( log.warning(
@ -218,14 +198,15 @@ class MsgStream(trio.abc.Channel):
# terminated and signal this local iterator to stop # terminated and signal this local iterator to stop
drained: list[Exception|dict] = await self.aclose() drained: list[Exception|dict] = await self.aclose()
if drained: if drained:
# ?TODO? pass these to the `._ctx._drained_msgs: deque`
# and then iterate them as part of any `.wait_for_result()` call?
#
# from .devx import pause # from .devx import pause
# await pause() # await pause()
log.warning( log.warning(
'Drained context msgs during closure:\n' 'Drained context msgs during closure\n\n'
f'{drained}' f'{drained}'
) )
# TODO: pass these to the `._ctx._drained_msgs: deque`
# and then iterate them as part of any `.result()` call?
# NOTE XXX: if the context was cancelled or remote-errored # NOTE XXX: if the context was cancelled or remote-errored
# but we received the stream close msg first, we # but we received the stream close msg first, we
@ -238,28 +219,36 @@ class MsgStream(trio.abc.Channel):
from_src_exc=src_err, from_src_exc=src_err,
) )
# propagate any error but hide low-level frame details # propagate any error but hide low-level frame details from
# from the caller by default for debug noise reduction. # the caller by default for console/debug-REPL noise
# reduction.
if ( if (
hide_tb hide_tb
and (
# XXX NOTE XXX don't reraise on certain # XXX NOTE special conditions: don't reraise on
# stream-specific internal error types like, # certain stream-specific internal error types like,
# #
# - `trio.EoC` since we want to use the exact instance # - `trio.EoC` since we want to use the exact instance
# to ensure that it is the error that bubbles upward # to ensure that it is the error that bubbles upward
# for silent absorption by `Context.open_stream()`. # for silent absorption by `Context.open_stream()`.
and not self._eoc not self._eoc
# - `RemoteActorError` (or `ContextCancelled`) if it gets # - `RemoteActorError` (or subtypes like ctxc)
# raised from `_raise_from_no_key_in_msg()` since we # since we want to present the error as though it is
# want the same (as the above bullet) for any # "sourced" directly from this `.receive()` call and
# `.open_context()` block bubbled error raised by # generally NOT include the stack frames raised from
# any nearby ctx API remote-failures. # inside the `PldRx` and/or the transport stack
# and not isinstance(src_err, RemoteActorError) # layers.
or isinstance(src_err, RemoteActorError)
)
): ):
raise type(src_err)(*src_err.args) from src_err raise type(src_err)(*src_err.args) from src_err
else: else:
# for any non-graceful-EOC we want to NOT hide this frame
if not self._eoc:
__tracebackhide__: bool = False
raise src_err raise src_err
async def aclose(self) -> list[Exception|dict]: async def aclose(self) -> list[Exception|dict]:
@ -385,6 +374,8 @@ class MsgStream(trio.abc.Channel):
if not self._eoc: if not self._eoc:
message: str = ( message: str = (
f'Stream self-closed by {self._ctx.side!r}-side before EoC\n' f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
# } bc a stream is a "scope"/msging-phase inside an IPC
f'x}}>\n'
f'|_{self}\n' f'|_{self}\n'
) )
log.cancel(message) log.cancel(message)