diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 3045b83..e449fef 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -54,6 +54,60 @@ log = get_logger(__name__) # messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]): # - use __slots__ on ``Context``? +def _raise_from_no_yield_msg( + stream: MsgStream, + msg: dict, + src_err: KeyError, + +) -> bool: + ''' + Raise an appopriate local error when a `MsgStream` msg arrives + which does not contain the expected (under normal operation) + `'yield'` field. + + ''' + # internal error should never get here + assert msg.get('cid'), ("Received internal error at portal?") + + # TODO: handle 2 cases with 3.10+ match syntax + # - 'stop' + # - 'error' + # possibly just handle msg['stop'] here! + + if stream._closed: + raise trio.ClosedResourceError('This stream was closed') + + if msg.get('stop') or stream._eoc: + log.debug(f"{stream} was stopped at remote end") + + # XXX: important to set so that a new ``.receive()`` + # call (likely by another task using a broadcast receiver) + # doesn't accidentally pull the ``return`` message + # value out of the underlying feed mem chan! + stream._eoc = True + + # # when the send is closed we assume the stream has + # # terminated and signal this local iterator to stop + # await stream.aclose() + + # XXX: this causes ``ReceiveChannel.__anext__()`` to + # raise a ``StopAsyncIteration`` **and** in our catch + # block below it will trigger ``.aclose()``. + raise trio.EndOfChannel from src_err + + # TODO: test that shows stream raising an expected error!!! + elif msg.get('error'): + # raise the error message + raise unpack_error(msg, stream._ctx.chan) + + # always re-raise the source error if no translation error + # case is activated above. + raise src_err + # raise RuntimeError( + # 'Unknown non-yield stream msg?\n' + # f'{msg}' + # ) + class MsgStream(trio.abc.Channel): ''' @@ -91,11 +145,20 @@ class MsgStream(trio.abc.Channel): # delegate directly to underlying mem channel def receive_nowait(self): msg = self._rx_chan.receive_nowait() - return msg['yield'] + try: + return msg['yield'] + except KeyError as kerr: + _raise_from_no_yield_msg( + stream=self, + msg=msg, + src_err=kerr, + ) async def receive(self): - '''Async receive a single msg from the IPC transport, the next - in sequence for this stream. + ''' + Receive a single msg from the IPC transport, the next in + sequence sent by the far end task (possibly in order as + determined by the underlying protocol). ''' # see ``.aclose()`` for notes on the old behaviour prior to @@ -110,43 +173,12 @@ class MsgStream(trio.abc.Channel): msg = await self._rx_chan.receive() return msg['yield'] - except KeyError as err: - # internal error should never get here - assert msg.get('cid'), ("Received internal error at portal?") - - # TODO: handle 2 cases with 3.10 match syntax - # - 'stop' - # - 'error' - # possibly just handle msg['stop'] here! - - if self._closed: - raise trio.ClosedResourceError('This stream was closed') - - if msg.get('stop') or self._eoc: - log.debug(f"{self} was stopped at remote end") - - # XXX: important to set so that a new ``.receive()`` - # call (likely by another task using a broadcast receiver) - # doesn't accidentally pull the ``return`` message - # value out of the underlying feed mem chan! - self._eoc = True - - # # when the send is closed we assume the stream has - # # terminated and signal this local iterator to stop - # await self.aclose() - - # XXX: this causes ``ReceiveChannel.__anext__()`` to - # raise a ``StopAsyncIteration`` **and** in our catch - # block below it will trigger ``.aclose()``. - raise trio.EndOfChannel from err - - # TODO: test that shows stream raising an expected error!!! - elif msg.get('error'): - # raise the error message - raise unpack_error(msg, self._ctx.chan) - - else: - raise + except KeyError as kerr: + _raise_from_no_yield_msg( + stream=self, + msg=msg, + src_err=kerr, + ) except ( trio.ClosedResourceError, # by self._rx_chan