Compare commits

...

3 Commits

Author SHA1 Message Date
Tyler Goodlet 7f29fd8dcf Let `pack_error()` take a msg injected `cid: str|None` 2024-02-18 17:17:31 -05:00
Tyler Goodlet 7fbada8a15 Add `StreamOverrun.sender: tuple` for better handling
Since it's generally useful to know who is the cause of an overrun (say
bc you want your system to then adjust the writer side to slow tf down)
might as well pack an extra `.sender: tuple[str, str]` actor uid field
which can be relayed through `RemoteActorError` boxing. Add an extra
case for the exc-type to `unpack_error()` to match B)
2024-02-16 15:23:02 -05:00
Tyler Goodlet 286e75d342 Offer `unpack_error(hid_tb: bool)` for `pdbp` REPL config 2024-02-14 16:13:32 -05:00
1 changed files with 48 additions and 17 deletions

View File

@ -134,8 +134,19 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet"
class StreamOverrun(trio.TooSlowError):
"This stream was overrun by sender"
class StreamOverrun(
RemoteActorError,
trio.TooSlowError,
):
'''
This stream was overrun by sender
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self.msgdata.get('sender')
if value:
return tuple(value)
class AsyncioCancelled(Exception):
@ -152,13 +163,15 @@ class MessagingError(Exception):
def pack_error(
exc: BaseException,
tb: str | None = None,
tb: str|None = None,
cid: str|None = None,
) -> dict[str, dict]:
'''
Create an "error message" encoded for wire transport via an IPC
`Channel`; expected to be unpacked on the receiver side using
`unpack_error()` below.
Create an "error message" which boxes a locally caught
exception's meta-data and encodes it for wire transport via an
IPC `Channel`; expected to be unpacked (and thus unboxed) on
the receiver side using `unpack_error()` below.
'''
if tb:
@ -175,19 +188,33 @@ def pack_error(
'src_actor_uid': current_actor().uid,
}
if isinstance(exc, ContextCancelled):
# TODO: ?just wholesale proxy `.msgdata: dict`?
# XXX WARNING, when i swapped these ctx-semantics
# tests started hanging..???!!!???
# if msgdata := exc.getattr('msgdata', {}):
# error_msg.update(msgdata)
if (
isinstance(exc, ContextCancelled)
or isinstance(exc, StreamOverrun)
):
error_msg.update(exc.msgdata)
return {'error': error_msg}
pkt: dict = {'error': error_msg}
if cid:
pkt['cid'] = cid
return pkt
def unpack_error(
msg: dict[str, Any],
chan=None,
err_type=RemoteActorError
err_type=RemoteActorError,
hide_tb: bool = True,
) -> None | Exception:
) -> None|Exception:
'''
Unpack an 'error' message from the wire
into a local `RemoteActorError` (subtype).
@ -196,7 +223,7 @@ def unpack_error(
which is the responsibilitiy of the caller.
'''
__tracebackhide__: bool = True
__tracebackhide__: bool = hide_tb
error_dict: dict[str, dict] | None
if (
@ -309,6 +336,11 @@ def _raise_from_no_key_in_msg(
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# TODO: if the a local task is already blocking on
# a `Context.result()` and thus a `.receive()` on the
# rx-chan, we close the chan and set state ensuring that
# an eoc is raised!
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
@ -317,8 +349,8 @@ def _raise_from_no_key_in_msg(
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
'Context[{cid}] stream ended due to msg:\n'
f'{pformat(msg)}'
f'Context stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
@ -333,8 +365,7 @@ def _raise_from_no_key_in_msg(
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f'{_type} was expecting a `{expect_key}` message'
' BUT received a non-`error` msg:\n'
f'cid: {cid}\n'
'{pformat(msg)}'
f"{_type} was expecting a '{expect_key}' message"
" BUT received a non-error msg:\n"
f'{pformat(msg)}'
) from src_err