diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 314a93b..bc87164 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -36,8 +36,8 @@ import warnings import trio from ._exceptions import ( - # _raise_from_no_key_in_msg, ContextCancelled, + RemoteActorError, ) from .log import get_logger from .trionics import ( @@ -101,7 +101,7 @@ class MsgStream(trio.abc.Channel): @property def ctx(self) -> Context: ''' - This stream's IPC `Context` ref. + A read-only ref to this stream's inter-actor-task `Context`. ''' return self._ctx @@ -145,9 +145,8 @@ class MsgStream(trio.abc.Channel): ''' __tracebackhide__: bool = hide_tb - # NOTE: `trio.ReceiveChannel` implements - # EOC handling as follows (aka uses it - # to gracefully exit async for loops): + # NOTE FYI: `trio.ReceiveChannel` implements EOC handling as + # follows (aka uses it to gracefully exit async for loops): # # async def __anext__(self) -> ReceiveType: # try: @@ -165,48 +164,29 @@ class MsgStream(trio.abc.Channel): src_err: Exception|None = None # orig tb try: - ctx: Context = self._ctx return await ctx._pld_rx.recv_pld(ipc=self) # XXX: the stream terminates on either of: - # - via `self._rx_chan.receive()` raising after manual closure - # by the rpc-runtime OR, - # - via a received `{'stop': ...}` msg from remote side. - # |_ NOTE: previously this was triggered by calling - # ``._rx_chan.aclose()`` on the send side of the channel inside - # `Actor._deliver_ctx_payload()`, but now the 'stop' message handling - # has been put just above inside `_raise_from_no_key_in_msg()`. - except ( - trio.EndOfChannel, - ) as eoc: - src_err = eoc + # - `self._rx_chan.receive()` raising after manual closure + # by the rpc-runtime, + # OR + # - via a `Stop`-msg received from remote peer task. + # NOTE + # |_ previously this was triggered by calling + # ``._rx_chan.aclose()`` on the send side of the channel + # inside `Actor._deliver_ctx_payload()`, but now the 'stop' + # message handling gets delegated to `PldRFx.recv_pld()` + # internals. + except trio.EndOfChannel as eoc: + # a graceful stream finished signal self._eoc = eoc + src_err = eoc - # TODO: Locally, we want to close this stream gracefully, by - # terminating any local consumers tasks deterministically. - # Once we have broadcast support, we **don't** want to be - # closing this stream and not flushing a final value to - # remaining (clone) consumers who may not have been - # scheduled to receive it yet. - # try: - # maybe_err_msg_or_res: dict = self._rx_chan.receive_nowait() - # if maybe_err_msg_or_res: - # log.warning( - # 'Discarding un-processed msg:\n' - # f'{maybe_err_msg_or_res}' - # ) - # except trio.WouldBlock: - # # no queued msgs that might be another remote - # # error, so just raise the original EoC - # pass - - # raise eoc - - # a ``ClosedResourceError`` indicates that the internal - # feeder memory receive channel was closed likely by the - # runtime after the associated transport-channel - # disconnected or broke. + # a `ClosedResourceError` indicates that the internal feeder + # memory receive channel was closed likely by the runtime + # after the associated transport-channel disconnected or + # broke. except trio.ClosedResourceError as cre: # by self._rx_chan.receive() src_err = cre log.warning( @@ -218,14 +198,15 @@ class MsgStream(trio.abc.Channel): # terminated and signal this local iterator to stop drained: list[Exception|dict] = await self.aclose() if drained: + # ?TODO? pass these to the `._ctx._drained_msgs: deque` + # and then iterate them as part of any `.wait_for_result()` call? + # # from .devx import pause # await pause() log.warning( - 'Drained context msgs during closure:\n' + 'Drained context msgs during closure\n\n' f'{drained}' ) - # TODO: pass these to the `._ctx._drained_msgs: deque` - # and then iterate them as part of any `.result()` call? # NOTE XXX: if the context was cancelled or remote-errored # but we received the stream close msg first, we @@ -238,28 +219,36 @@ class MsgStream(trio.abc.Channel): from_src_exc=src_err, ) - # propagate any error but hide low-level frame details - # from the caller by default for debug noise reduction. + # propagate any error but hide low-level frame details from + # the caller by default for console/debug-REPL noise + # reduction. if ( hide_tb + and ( - # XXX NOTE XXX don't reraise on certain - # stream-specific internal error types like, - # - # - `trio.EoC` since we want to use the exact instance - # to ensure that it is the error that bubbles upward - # for silent absorption by `Context.open_stream()`. - and not self._eoc + # XXX NOTE special conditions: don't reraise on + # certain stream-specific internal error types like, + # + # - `trio.EoC` since we want to use the exact instance + # to ensure that it is the error that bubbles upward + # for silent absorption by `Context.open_stream()`. + not self._eoc - # - `RemoteActorError` (or `ContextCancelled`) if it gets - # raised from `_raise_from_no_key_in_msg()` since we - # want the same (as the above bullet) for any - # `.open_context()` block bubbled error raised by - # any nearby ctx API remote-failures. - # and not isinstance(src_err, RemoteActorError) + # - `RemoteActorError` (or subtypes like ctxc) + # since we want to present the error as though it is + # "sourced" directly from this `.receive()` call and + # generally NOT include the stack frames raised from + # inside the `PldRx` and/or the transport stack + # layers. + or isinstance(src_err, RemoteActorError) + ) ): raise type(src_err)(*src_err.args) from src_err else: + # for any non-graceful-EOC we want to NOT hide this frame + if not self._eoc: + __tracebackhide__: bool = False + raise src_err async def aclose(self) -> list[Exception|dict]: @@ -385,6 +374,8 @@ class MsgStream(trio.abc.Channel): if not self._eoc: message: str = ( f'Stream self-closed by {self._ctx.side!r}-side before EoC\n' + # } bc a stream is a "scope"/msging-phase inside an IPC + f'x}}>\n' f'|_{self}\n' ) log.cancel(message)