Compare commits

..

No commits in common. "7f29fd8dcfde9084debb716ce105cf06f13281fd" and "df641d9d316abc0647e5693bf702ef9437b6a706" have entirely different histories.

1 changed files with 17 additions and 48 deletions

View File

@ -134,19 +134,8 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet"
class StreamOverrun(
RemoteActorError,
trio.TooSlowError,
):
'''
This stream was overrun by sender
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self.msgdata.get('sender')
if value:
return tuple(value)
class StreamOverrun(trio.TooSlowError):
"This stream was overrun by sender"
class AsyncioCancelled(Exception):
@ -164,14 +153,12 @@ class MessagingError(Exception):
def pack_error(
exc: BaseException,
tb: str | None = None,
cid: str|None = None,
) -> dict[str, dict]:
'''
Create an "error message" which boxes a locally caught
exception's meta-data and encodes it for wire transport via an
IPC `Channel`; expected to be unpacked (and thus unboxed) on
the receiver side using `unpack_error()` below.
Create an "error message" encoded for wire transport via an IPC
`Channel`; expected to be unpacked on the receiver side using
`unpack_error()` below.
'''
if tb:
@ -188,31 +175,17 @@ def pack_error(
'src_actor_uid': current_actor().uid,
}
# TODO: ?just wholesale proxy `.msgdata: dict`?
# XXX WARNING, when i swapped these ctx-semantics
# tests started hanging..???!!!???
# if msgdata := exc.getattr('msgdata', {}):
# error_msg.update(msgdata)
if (
isinstance(exc, ContextCancelled)
or isinstance(exc, StreamOverrun)
):
if isinstance(exc, ContextCancelled):
error_msg.update(exc.msgdata)
pkt: dict = {'error': error_msg}
if cid:
pkt['cid'] = cid
return pkt
return {'error': error_msg}
def unpack_error(
msg: dict[str, Any],
chan=None,
err_type=RemoteActorError,
hide_tb: bool = True,
err_type=RemoteActorError
) -> None | Exception:
'''
@ -223,7 +196,7 @@ def unpack_error(
which is the responsibilitiy of the caller.
'''
__tracebackhide__: bool = hide_tb
__tracebackhide__: bool = True
error_dict: dict[str, dict] | None
if (
@ -336,11 +309,6 @@ def _raise_from_no_key_in_msg(
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# TODO: if the a local task is already blocking on
# a `Context.result()` and thus a `.receive()` on the
# rx-chan, we close the chan and set state ensuring that
# an eoc is raised!
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
@ -349,7 +317,7 @@ def _raise_from_no_key_in_msg(
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
f'Context stream ended due to msg:\n'
'Context[{cid}] stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
@ -365,7 +333,8 @@ def _raise_from_no_key_in_msg(
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f"{_type} was expecting a '{expect_key}' message"
" BUT received a non-error msg:\n"
f'{pformat(msg)}'
f'{_type} was expecting a `{expect_key}` message'
' BUT received a non-`error` msg:\n'
f'cid: {cid}\n'
'{pformat(msg)}'
) from src_err