Compare commits
3 Commits
df641d9d31
...
7f29fd8dcf
Author | SHA1 | Date |
---|---|---|
|
7f29fd8dcf | |
|
7fbada8a15 | |
|
286e75d342 |
|
@ -134,8 +134,19 @@ class NoRuntime(RuntimeError):
|
||||||
"The root actor has not been initialized yet"
|
"The root actor has not been initialized yet"
|
||||||
|
|
||||||
|
|
||||||
class StreamOverrun(trio.TooSlowError):
|
class StreamOverrun(
|
||||||
"This stream was overrun by sender"
|
RemoteActorError,
|
||||||
|
trio.TooSlowError,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
This stream was overrun by sender
|
||||||
|
|
||||||
|
'''
|
||||||
|
@property
|
||||||
|
def sender(self) -> tuple[str, str] | None:
|
||||||
|
value = self.msgdata.get('sender')
|
||||||
|
if value:
|
||||||
|
return tuple(value)
|
||||||
|
|
||||||
|
|
||||||
class AsyncioCancelled(Exception):
|
class AsyncioCancelled(Exception):
|
||||||
|
@ -152,13 +163,15 @@ class MessagingError(Exception):
|
||||||
|
|
||||||
def pack_error(
|
def pack_error(
|
||||||
exc: BaseException,
|
exc: BaseException,
|
||||||
tb: str | None = None,
|
tb: str|None = None,
|
||||||
|
cid: str|None = None,
|
||||||
|
|
||||||
) -> dict[str, dict]:
|
) -> dict[str, dict]:
|
||||||
'''
|
'''
|
||||||
Create an "error message" encoded for wire transport via an IPC
|
Create an "error message" which boxes a locally caught
|
||||||
`Channel`; expected to be unpacked on the receiver side using
|
exception's meta-data and encodes it for wire transport via an
|
||||||
`unpack_error()` below.
|
IPC `Channel`; expected to be unpacked (and thus unboxed) on
|
||||||
|
the receiver side using `unpack_error()` below.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if tb:
|
if tb:
|
||||||
|
@ -175,19 +188,33 @@ def pack_error(
|
||||||
'src_actor_uid': current_actor().uid,
|
'src_actor_uid': current_actor().uid,
|
||||||
}
|
}
|
||||||
|
|
||||||
if isinstance(exc, ContextCancelled):
|
# TODO: ?just wholesale proxy `.msgdata: dict`?
|
||||||
|
# XXX WARNING, when i swapped these ctx-semantics
|
||||||
|
# tests started hanging..???!!!???
|
||||||
|
# if msgdata := exc.getattr('msgdata', {}):
|
||||||
|
# error_msg.update(msgdata)
|
||||||
|
if (
|
||||||
|
isinstance(exc, ContextCancelled)
|
||||||
|
or isinstance(exc, StreamOverrun)
|
||||||
|
):
|
||||||
error_msg.update(exc.msgdata)
|
error_msg.update(exc.msgdata)
|
||||||
|
|
||||||
return {'error': error_msg}
|
|
||||||
|
pkt: dict = {'error': error_msg}
|
||||||
|
if cid:
|
||||||
|
pkt['cid'] = cid
|
||||||
|
|
||||||
|
return pkt
|
||||||
|
|
||||||
|
|
||||||
def unpack_error(
|
def unpack_error(
|
||||||
|
|
||||||
msg: dict[str, Any],
|
msg: dict[str, Any],
|
||||||
chan=None,
|
chan=None,
|
||||||
err_type=RemoteActorError
|
err_type=RemoteActorError,
|
||||||
|
hide_tb: bool = True,
|
||||||
|
|
||||||
) -> None | Exception:
|
) -> None|Exception:
|
||||||
'''
|
'''
|
||||||
Unpack an 'error' message from the wire
|
Unpack an 'error' message from the wire
|
||||||
into a local `RemoteActorError` (subtype).
|
into a local `RemoteActorError` (subtype).
|
||||||
|
@ -196,7 +223,7 @@ def unpack_error(
|
||||||
which is the responsibilitiy of the caller.
|
which is the responsibilitiy of the caller.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
error_dict: dict[str, dict] | None
|
error_dict: dict[str, dict] | None
|
||||||
if (
|
if (
|
||||||
|
@ -309,6 +336,11 @@ def _raise_from_no_key_in_msg(
|
||||||
# value out of the underlying feed mem chan!
|
# value out of the underlying feed mem chan!
|
||||||
stream._eoc: bool = True
|
stream._eoc: bool = True
|
||||||
|
|
||||||
|
# TODO: if the a local task is already blocking on
|
||||||
|
# a `Context.result()` and thus a `.receive()` on the
|
||||||
|
# rx-chan, we close the chan and set state ensuring that
|
||||||
|
# an eoc is raised!
|
||||||
|
|
||||||
# # when the send is closed we assume the stream has
|
# # when the send is closed we assume the stream has
|
||||||
# # terminated and signal this local iterator to stop
|
# # terminated and signal this local iterator to stop
|
||||||
# await stream.aclose()
|
# await stream.aclose()
|
||||||
|
@ -317,7 +349,7 @@ def _raise_from_no_key_in_msg(
|
||||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||||
# block below it will trigger ``.aclose()``.
|
# block below it will trigger ``.aclose()``.
|
||||||
raise trio.EndOfChannel(
|
raise trio.EndOfChannel(
|
||||||
'Context[{cid}] stream ended due to msg:\n'
|
f'Context stream ended due to msg:\n'
|
||||||
f'{pformat(msg)}'
|
f'{pformat(msg)}'
|
||||||
) from src_err
|
) from src_err
|
||||||
|
|
||||||
|
@ -333,8 +365,7 @@ def _raise_from_no_key_in_msg(
|
||||||
# is activated above.
|
# is activated above.
|
||||||
_type: str = 'Stream' if stream else 'Context'
|
_type: str = 'Stream' if stream else 'Context'
|
||||||
raise MessagingError(
|
raise MessagingError(
|
||||||
f'{_type} was expecting a `{expect_key}` message'
|
f"{_type} was expecting a '{expect_key}' message"
|
||||||
' BUT received a non-`error` msg:\n'
|
" BUT received a non-error msg:\n"
|
||||||
f'cid: {cid}\n'
|
f'{pformat(msg)}'
|
||||||
'{pformat(msg)}'
|
|
||||||
) from src_err
|
) from src_err
|
||||||
|
|
Loading…
Reference in New Issue