diff --git a/tractor/_streaming.py b/tractor/_streaming.py index e449fef..f02197b 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -23,6 +23,7 @@ The machinery and types behind ``Context.open_stream()`` from __future__ import annotations import inspect from contextlib import asynccontextmanager as acm +from pprint import pformat from typing import ( Any, Callable, @@ -35,6 +36,7 @@ import trio from ._exceptions import ( unpack_error, + MessagingError, ) from .log import get_logger from .trionics import ( @@ -66,6 +68,8 @@ def _raise_from_no_yield_msg( `'yield'` field. ''' + __tracebackhide__: bool = True + # internal error should never get here assert msg.get('cid'), ("Received internal error at portal?") @@ -73,18 +77,22 @@ def _raise_from_no_yield_msg( # - 'stop' # - 'error' # possibly just handle msg['stop'] here! + # breakpoint() if stream._closed: raise trio.ClosedResourceError('This stream was closed') - if msg.get('stop') or stream._eoc: - log.debug(f"{stream} was stopped at remote end") + if ( + msg.get('stop') + or stream._eoc + ): + log.debug(f'{stream} was stopped at remote end') # XXX: important to set so that a new ``.receive()`` # call (likely by another task using a broadcast receiver) # doesn't accidentally pull the ``return`` message # value out of the underlying feed mem chan! - stream._eoc = True + stream._eoc: bool = True # # when the send is closed we assume the stream has # # terminated and signal this local iterator to stop @@ -93,20 +101,24 @@ def _raise_from_no_yield_msg( # XXX: this causes ``ReceiveChannel.__anext__()`` to # raise a ``StopAsyncIteration`` **and** in our catch # block below it will trigger ``.aclose()``. - raise trio.EndOfChannel from src_err + raise trio.EndOfChannel( + 'Stream ended due to msg:\n' + f'{pformat(msg)}' + ) from src_err # TODO: test that shows stream raising an expected error!!! elif msg.get('error'): # raise the error message raise unpack_error(msg, stream._ctx.chan) - # always re-raise the source error if no translation error - # case is activated above. - raise src_err - # raise RuntimeError( - # 'Unknown non-yield stream msg?\n' - # f'{msg}' - # ) + # always re-raise the source error if no translation error case + # is activated above. + raise MessagingError( + f'Context received unexpected non-error msg!?\n' + f'cid: {cid}\n' + 'received msg:\n' + f'{pformat(msg)}' + ) from src_err class MsgStream(trio.abc.Channel): @@ -161,6 +173,16 @@ class MsgStream(trio.abc.Channel): determined by the underlying protocol). ''' + # NOTE: `trio.ReceiveChannel` implements + # EOC handling as follows (aka uses it + # to gracefully exit async for loops): + # + # async def __anext__(self) -> ReceiveType: + # try: + # return await self.receive() + # except trio.EndOfChannel: + # raise StopAsyncIteration + # see ``.aclose()`` for notes on the old behaviour prior to # introducing this if self._eoc: