Compare commits

..

No commits in common. "7f29fd8dcfde9084debb716ce105cf06f13281fd" and "df641d9d316abc0647e5693bf702ef9437b6a706" have entirely different histories.

1 changed files with 17 additions and 48 deletions

View File

@ -134,19 +134,8 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet" "The root actor has not been initialized yet"
class StreamOverrun( class StreamOverrun(trio.TooSlowError):
RemoteActorError, "This stream was overrun by sender"
trio.TooSlowError,
):
'''
This stream was overrun by sender
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self.msgdata.get('sender')
if value:
return tuple(value)
class AsyncioCancelled(Exception): class AsyncioCancelled(Exception):
@ -163,15 +152,13 @@ class MessagingError(Exception):
def pack_error( def pack_error(
exc: BaseException, exc: BaseException,
tb: str|None = None, tb: str | None = None,
cid: str|None = None,
) -> dict[str, dict]: ) -> dict[str, dict]:
''' '''
Create an "error message" which boxes a locally caught Create an "error message" encoded for wire transport via an IPC
exception's meta-data and encodes it for wire transport via an `Channel`; expected to be unpacked on the receiver side using
IPC `Channel`; expected to be unpacked (and thus unboxed) on `unpack_error()` below.
the receiver side using `unpack_error()` below.
''' '''
if tb: if tb:
@ -188,33 +175,19 @@ def pack_error(
'src_actor_uid': current_actor().uid, 'src_actor_uid': current_actor().uid,
} }
# TODO: ?just wholesale proxy `.msgdata: dict`? if isinstance(exc, ContextCancelled):
# XXX WARNING, when i swapped these ctx-semantics
# tests started hanging..???!!!???
# if msgdata := exc.getattr('msgdata', {}):
# error_msg.update(msgdata)
if (
isinstance(exc, ContextCancelled)
or isinstance(exc, StreamOverrun)
):
error_msg.update(exc.msgdata) error_msg.update(exc.msgdata)
return {'error': error_msg}
pkt: dict = {'error': error_msg}
if cid:
pkt['cid'] = cid
return pkt
def unpack_error( def unpack_error(
msg: dict[str, Any], msg: dict[str, Any],
chan=None, chan=None,
err_type=RemoteActorError, err_type=RemoteActorError
hide_tb: bool = True,
) -> None|Exception: ) -> None | Exception:
''' '''
Unpack an 'error' message from the wire Unpack an 'error' message from the wire
into a local `RemoteActorError` (subtype). into a local `RemoteActorError` (subtype).
@ -223,7 +196,7 @@ def unpack_error(
which is the responsibilitiy of the caller. which is the responsibilitiy of the caller.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = True
error_dict: dict[str, dict] | None error_dict: dict[str, dict] | None
if ( if (
@ -336,11 +309,6 @@ def _raise_from_no_key_in_msg(
# value out of the underlying feed mem chan! # value out of the underlying feed mem chan!
stream._eoc: bool = True stream._eoc: bool = True
# TODO: if the a local task is already blocking on
# a `Context.result()` and thus a `.receive()` on the
# rx-chan, we close the chan and set state ensuring that
# an eoc is raised!
# # when the send is closed we assume the stream has # # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop # # terminated and signal this local iterator to stop
# await stream.aclose() # await stream.aclose()
@ -349,8 +317,8 @@ def _raise_from_no_key_in_msg(
# raise a ``StopAsyncIteration`` **and** in our catch # raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``. # block below it will trigger ``.aclose()``.
raise trio.EndOfChannel( raise trio.EndOfChannel(
f'Context stream ended due to msg:\n' 'Context[{cid}] stream ended due to msg:\n'
f'{pformat(msg)}' f'{pformat(msg)}'
) from src_err ) from src_err
@ -365,7 +333,8 @@ def _raise_from_no_key_in_msg(
# is activated above. # is activated above.
_type: str = 'Stream' if stream else 'Context' _type: str = 'Stream' if stream else 'Context'
raise MessagingError( raise MessagingError(
f"{_type} was expecting a '{expect_key}' message" f'{_type} was expecting a `{expect_key}` message'
" BUT received a non-error msg:\n" ' BUT received a non-`error` msg:\n'
f'{pformat(msg)}' f'cid: {cid}\n'
'{pformat(msg)}'
) from src_err ) from src_err