forked from goodboy/tractor
1
0
Fork 0

Set any `._eoc` to the err in `_raise_from_no_key_in_msg()`

Since that's what we're now doing in `MsgStream._eoc` internal
assignments (coming in future patch), do the same in this exception
re-raise-helper and include more extensive doc string detailing all
the msg-type-to-raised-error cases. Also expose a `hide_tb: bool` like
we have already in `unpack_error()`.
modden_spawn_from_client_req
Tyler Goodlet 2024-02-21 13:17:37 -05:00
parent 82dcaff8db
commit 10adf34be5
1 changed files with 43 additions and 20 deletions

View File

@ -237,8 +237,10 @@ def pack_error(
def unpack_error( def unpack_error(
msg: dict[str, Any], msg: dict[str, Any],
chan=None, chan=None,
err_type=RemoteActorError, err_type=RemoteActorError,
hide_tb: bool = True, hide_tb: bool = True,
) -> None|Exception: ) -> None|Exception:
@ -314,37 +316,61 @@ def _raise_from_no_key_in_msg(
msg: dict, msg: dict,
src_err: KeyError, src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield', expect_key: str = 'yield',
stream: MsgStream | None = None, stream: MsgStream | None = None,
# allow "deeper" tbs when debugging B^o
hide_tb: bool = True,
) -> bool: ) -> bool:
''' '''
Raise an appopriate local error when a `MsgStream` msg arrives Raise an appopriate local error when a
which does not contain the expected (under normal operation) `MsgStream` msg arrives which does not
`'yield'` field. contain the expected (at least under normal
operation) `'yield'` field.
`Context` and any embedded `MsgStream` termination,
as well as remote task errors are handled in order
of priority as:
- any 'error' msg is re-boxed and raised locally as
-> `RemoteActorError`|`ContextCancelled`
- a `MsgStream` 'stop' msg is constructed, assigned
and raised locally as -> `trio.EndOfChannel`
- All other mis-keyed msgss (like say a "final result"
'return' msg, normally delivered from `Context.result()`)
are re-boxed inside a `MessagingError` with an explicit
exc content describing the missing IPC-msg-key.
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = hide_tb
# internal error should never get here # an internal error should never get here
try: try:
cid: str = msg['cid'] cid: str = msg['cid']
except KeyError as src_err: except KeyError as src_err:
raise MessagingError( raise MessagingError(
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n' f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
f'cid: {cid}\n' f'cid: {cid}\n\n'
'received msg:\n'
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) from src_err ) from src_err
# TODO: test that shows stream raising an expected error!!! # TODO: test that shows stream raising an expected error!!!
# raise the error message in a boxed exception type!
if msg.get('error'): if msg.get('error'):
# raise the error message
raise unpack_error( raise unpack_error(
msg, msg,
ctx.chan, ctx.chan,
hide_tb=hide_tb,
) from None ) from None
# `MsgStream` termination msg.
elif ( elif (
msg.get('stop') msg.get('stop')
or ( or (
@ -357,29 +383,26 @@ def _raise_from_no_key_in_msg(
f'cid: {cid}\n' f'cid: {cid}\n'
) )
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# TODO: if the a local task is already blocking on # TODO: if the a local task is already blocking on
# a `Context.result()` and thus a `.receive()` on the # a `Context.result()` and thus a `.receive()` on the
# rx-chan, we close the chan and set state ensuring that # rx-chan, we close the chan and set state ensuring that
# an eoc is raised! # an eoc is raised!
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to # XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch # raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``. # block below it will trigger ``.aclose()``.
raise trio.EndOfChannel( eoc = trio.EndOfChannel(
f'Context stream ended due to msg:\n\n' f'Context stream ended due to msg:\n\n'
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) from src_err )
# XXX: important to set so that a new `.receive()`
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the `return` message
# value out of the underlying feed mem chan which is
# destined for the `Context.result()` call during ctx-exit!
stream._eoc: Exception = eoc
raise eoc from src_err
if ( if (
stream stream