diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index a6d10de..bdd8d41 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -237,8 +237,10 @@ def pack_error( def unpack_error( msg: dict[str, Any], + chan=None, err_type=RemoteActorError, + hide_tb: bool = True, ) -> None|Exception: @@ -314,37 +316,61 @@ def _raise_from_no_key_in_msg( msg: dict, src_err: KeyError, log: StackLevelAdapter, # caller specific `log` obj + expect_key: str = 'yield', stream: MsgStream | None = None, + # allow "deeper" tbs when debugging B^o + hide_tb: bool = True, + ) -> bool: ''' - Raise an appopriate local error when a `MsgStream` msg arrives - which does not contain the expected (under normal operation) - `'yield'` field. + Raise an appopriate local error when a + `MsgStream` msg arrives which does not + contain the expected (at least under normal + operation) `'yield'` field. + + `Context` and any embedded `MsgStream` termination, + as well as remote task errors are handled in order + of priority as: + + - any 'error' msg is re-boxed and raised locally as + -> `RemoteActorError`|`ContextCancelled` + + - a `MsgStream` 'stop' msg is constructed, assigned + and raised locally as -> `trio.EndOfChannel` + + - All other mis-keyed msgss (like say a "final result" + 'return' msg, normally delivered from `Context.result()`) + are re-boxed inside a `MessagingError` with an explicit + exc content describing the missing IPC-msg-key. ''' - __tracebackhide__: bool = True + __tracebackhide__: bool = hide_tb - # internal error should never get here + # an internal error should never get here try: cid: str = msg['cid'] except KeyError as src_err: raise MessagingError( f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n' - f'cid: {cid}\n' - 'received msg:\n' + f'cid: {cid}\n\n' + f'{pformat(msg)}\n' ) from src_err # TODO: test that shows stream raising an expected error!!! + + # raise the error message in a boxed exception type! if msg.get('error'): - # raise the error message raise unpack_error( msg, ctx.chan, + hide_tb=hide_tb, + ) from None + # `MsgStream` termination msg. elif ( msg.get('stop') or ( @@ -357,29 +383,26 @@ def _raise_from_no_key_in_msg( f'cid: {cid}\n' ) - # XXX: important to set so that a new ``.receive()`` - # call (likely by another task using a broadcast receiver) - # doesn't accidentally pull the ``return`` message - # value out of the underlying feed mem chan! - stream._eoc: bool = True - # TODO: if the a local task is already blocking on # a `Context.result()` and thus a `.receive()` on the # rx-chan, we close the chan and set state ensuring that # an eoc is raised! - # # when the send is closed we assume the stream has - # # terminated and signal this local iterator to stop - # await stream.aclose() - # XXX: this causes ``ReceiveChannel.__anext__()`` to # raise a ``StopAsyncIteration`` **and** in our catch # block below it will trigger ``.aclose()``. - raise trio.EndOfChannel( + eoc = trio.EndOfChannel( f'Context stream ended due to msg:\n\n' f'{pformat(msg)}\n' - ) from src_err + ) + # XXX: important to set so that a new `.receive()` + # call (likely by another task using a broadcast receiver) + # doesn't accidentally pull the `return` message + # value out of the underlying feed mem chan which is + # destined for the `Context.result()` call during ctx-exit! + stream._eoc: Exception = eoc + raise eoc from src_err if ( stream