Compare commits

...

3 Commits

Author SHA1 Message Date
Tyler Goodlet 7f29fd8dcf Let `pack_error()` take a msg injected `cid: str|None` 2024-02-18 17:17:31 -05:00
Tyler Goodlet 7fbada8a15 Add `StreamOverrun.sender: tuple` for better handling
Since it's generally useful to know who is the cause of an overrun (say
bc you want your system to then adjust the writer side to slow tf down)
might as well pack an extra `.sender: tuple[str, str]` actor uid field
which can be relayed through `RemoteActorError` boxing. Add an extra
case for the exc-type to `unpack_error()` to match B)
2024-02-16 15:23:02 -05:00
Tyler Goodlet 286e75d342 Offer `unpack_error(hid_tb: bool)` for `pdbp` REPL config 2024-02-14 16:13:32 -05:00
1 changed files with 48 additions and 17 deletions

View File

@ -134,8 +134,19 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet" "The root actor has not been initialized yet"
class StreamOverrun(trio.TooSlowError): class StreamOverrun(
"This stream was overrun by sender" RemoteActorError,
trio.TooSlowError,
):
'''
This stream was overrun by sender
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self.msgdata.get('sender')
if value:
return tuple(value)
class AsyncioCancelled(Exception): class AsyncioCancelled(Exception):
@ -152,13 +163,15 @@ class MessagingError(Exception):
def pack_error( def pack_error(
exc: BaseException, exc: BaseException,
tb: str | None = None, tb: str|None = None,
cid: str|None = None,
) -> dict[str, dict]: ) -> dict[str, dict]:
''' '''
Create an "error message" encoded for wire transport via an IPC Create an "error message" which boxes a locally caught
`Channel`; expected to be unpacked on the receiver side using exception's meta-data and encodes it for wire transport via an
`unpack_error()` below. IPC `Channel`; expected to be unpacked (and thus unboxed) on
the receiver side using `unpack_error()` below.
''' '''
if tb: if tb:
@ -175,19 +188,33 @@ def pack_error(
'src_actor_uid': current_actor().uid, 'src_actor_uid': current_actor().uid,
} }
if isinstance(exc, ContextCancelled): # TODO: ?just wholesale proxy `.msgdata: dict`?
# XXX WARNING, when i swapped these ctx-semantics
# tests started hanging..???!!!???
# if msgdata := exc.getattr('msgdata', {}):
# error_msg.update(msgdata)
if (
isinstance(exc, ContextCancelled)
or isinstance(exc, StreamOverrun)
):
error_msg.update(exc.msgdata) error_msg.update(exc.msgdata)
return {'error': error_msg}
pkt: dict = {'error': error_msg}
if cid:
pkt['cid'] = cid
return pkt
def unpack_error( def unpack_error(
msg: dict[str, Any], msg: dict[str, Any],
chan=None, chan=None,
err_type=RemoteActorError err_type=RemoteActorError,
hide_tb: bool = True,
) -> None | Exception: ) -> None|Exception:
''' '''
Unpack an 'error' message from the wire Unpack an 'error' message from the wire
into a local `RemoteActorError` (subtype). into a local `RemoteActorError` (subtype).
@ -196,7 +223,7 @@ def unpack_error(
which is the responsibilitiy of the caller. which is the responsibilitiy of the caller.
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = hide_tb
error_dict: dict[str, dict] | None error_dict: dict[str, dict] | None
if ( if (
@ -309,6 +336,11 @@ def _raise_from_no_key_in_msg(
# value out of the underlying feed mem chan! # value out of the underlying feed mem chan!
stream._eoc: bool = True stream._eoc: bool = True
# TODO: if the a local task is already blocking on
# a `Context.result()` and thus a `.receive()` on the
# rx-chan, we close the chan and set state ensuring that
# an eoc is raised!
# # when the send is closed we assume the stream has # # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop # # terminated and signal this local iterator to stop
# await stream.aclose() # await stream.aclose()
@ -317,8 +349,8 @@ def _raise_from_no_key_in_msg(
# raise a ``StopAsyncIteration`` **and** in our catch # raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``. # block below it will trigger ``.aclose()``.
raise trio.EndOfChannel( raise trio.EndOfChannel(
'Context[{cid}] stream ended due to msg:\n' f'Context stream ended due to msg:\n'
f'{pformat(msg)}' f'{pformat(msg)}'
) from src_err ) from src_err
@ -333,8 +365,7 @@ def _raise_from_no_key_in_msg(
# is activated above. # is activated above.
_type: str = 'Stream' if stream else 'Context' _type: str = 'Stream' if stream else 'Context'
raise MessagingError( raise MessagingError(
f'{_type} was expecting a `{expect_key}` message' f"{_type} was expecting a '{expect_key}' message"
' BUT received a non-`error` msg:\n' " BUT received a non-error msg:\n"
f'cid: {cid}\n' f'{pformat(msg)}'
'{pformat(msg)}'
) from src_err ) from src_err