Compare commits
3 Commits
df641d9d31
...
7f29fd8dcf
Author | SHA1 | Date |
---|---|---|
|
7f29fd8dcf | |
|
7fbada8a15 | |
|
286e75d342 |
|
@ -134,8 +134,19 @@ class NoRuntime(RuntimeError):
|
|||
"The root actor has not been initialized yet"
|
||||
|
||||
|
||||
class StreamOverrun(trio.TooSlowError):
|
||||
"This stream was overrun by sender"
|
||||
class StreamOverrun(
|
||||
RemoteActorError,
|
||||
trio.TooSlowError,
|
||||
):
|
||||
'''
|
||||
This stream was overrun by sender
|
||||
|
||||
'''
|
||||
@property
|
||||
def sender(self) -> tuple[str, str] | None:
|
||||
value = self.msgdata.get('sender')
|
||||
if value:
|
||||
return tuple(value)
|
||||
|
||||
|
||||
class AsyncioCancelled(Exception):
|
||||
|
@ -152,13 +163,15 @@ class MessagingError(Exception):
|
|||
|
||||
def pack_error(
|
||||
exc: BaseException,
|
||||
tb: str | None = None,
|
||||
tb: str|None = None,
|
||||
cid: str|None = None,
|
||||
|
||||
) -> dict[str, dict]:
|
||||
'''
|
||||
Create an "error message" encoded for wire transport via an IPC
|
||||
`Channel`; expected to be unpacked on the receiver side using
|
||||
`unpack_error()` below.
|
||||
Create an "error message" which boxes a locally caught
|
||||
exception's meta-data and encodes it for wire transport via an
|
||||
IPC `Channel`; expected to be unpacked (and thus unboxed) on
|
||||
the receiver side using `unpack_error()` below.
|
||||
|
||||
'''
|
||||
if tb:
|
||||
|
@ -175,19 +188,33 @@ def pack_error(
|
|||
'src_actor_uid': current_actor().uid,
|
||||
}
|
||||
|
||||
if isinstance(exc, ContextCancelled):
|
||||
# TODO: ?just wholesale proxy `.msgdata: dict`?
|
||||
# XXX WARNING, when i swapped these ctx-semantics
|
||||
# tests started hanging..???!!!???
|
||||
# if msgdata := exc.getattr('msgdata', {}):
|
||||
# error_msg.update(msgdata)
|
||||
if (
|
||||
isinstance(exc, ContextCancelled)
|
||||
or isinstance(exc, StreamOverrun)
|
||||
):
|
||||
error_msg.update(exc.msgdata)
|
||||
|
||||
return {'error': error_msg}
|
||||
|
||||
pkt: dict = {'error': error_msg}
|
||||
if cid:
|
||||
pkt['cid'] = cid
|
||||
|
||||
return pkt
|
||||
|
||||
|
||||
def unpack_error(
|
||||
|
||||
msg: dict[str, Any],
|
||||
chan=None,
|
||||
err_type=RemoteActorError
|
||||
err_type=RemoteActorError,
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> None | Exception:
|
||||
) -> None|Exception:
|
||||
'''
|
||||
Unpack an 'error' message from the wire
|
||||
into a local `RemoteActorError` (subtype).
|
||||
|
@ -196,7 +223,7 @@ def unpack_error(
|
|||
which is the responsibilitiy of the caller.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
error_dict: dict[str, dict] | None
|
||||
if (
|
||||
|
@ -309,6 +336,11 @@ def _raise_from_no_key_in_msg(
|
|||
# value out of the underlying feed mem chan!
|
||||
stream._eoc: bool = True
|
||||
|
||||
# TODO: if the a local task is already blocking on
|
||||
# a `Context.result()` and thus a `.receive()` on the
|
||||
# rx-chan, we close the chan and set state ensuring that
|
||||
# an eoc is raised!
|
||||
|
||||
# # when the send is closed we assume the stream has
|
||||
# # terminated and signal this local iterator to stop
|
||||
# await stream.aclose()
|
||||
|
@ -317,8 +349,8 @@ def _raise_from_no_key_in_msg(
|
|||
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||
# block below it will trigger ``.aclose()``.
|
||||
raise trio.EndOfChannel(
|
||||
'Context[{cid}] stream ended due to msg:\n'
|
||||
f'{pformat(msg)}'
|
||||
f'Context stream ended due to msg:\n'
|
||||
f'{pformat(msg)}'
|
||||
) from src_err
|
||||
|
||||
|
||||
|
@ -333,8 +365,7 @@ def _raise_from_no_key_in_msg(
|
|||
# is activated above.
|
||||
_type: str = 'Stream' if stream else 'Context'
|
||||
raise MessagingError(
|
||||
f'{_type} was expecting a `{expect_key}` message'
|
||||
' BUT received a non-`error` msg:\n'
|
||||
f'cid: {cid}\n'
|
||||
'{pformat(msg)}'
|
||||
f"{_type} was expecting a '{expect_key}' message"
|
||||
" BUT received a non-error msg:\n"
|
||||
f'{pformat(msg)}'
|
||||
) from src_err
|
||||
|
|
Loading…
Reference in New Issue