Compare commits
No commits in common. "7f29fd8dcfde9084debb716ce105cf06f13281fd" and "df641d9d316abc0647e5693bf702ef9437b6a706" have entirely different histories.
7f29fd8dcf
...
df641d9d31
|
@ -134,19 +134,8 @@ class NoRuntime(RuntimeError):
|
||||||
"The root actor has not been initialized yet"
|
"The root actor has not been initialized yet"
|
||||||
|
|
||||||
|
|
||||||
class StreamOverrun(
|
class StreamOverrun(trio.TooSlowError):
|
||||||
RemoteActorError,
|
"This stream was overrun by sender"
|
||||||
trio.TooSlowError,
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
This stream was overrun by sender
|
|
||||||
|
|
||||||
'''
|
|
||||||
@property
|
|
||||||
def sender(self) -> tuple[str, str] | None:
|
|
||||||
value = self.msgdata.get('sender')
|
|
||||||
if value:
|
|
||||||
return tuple(value)
|
|
||||||
|
|
||||||
|
|
||||||
class AsyncioCancelled(Exception):
|
class AsyncioCancelled(Exception):
|
||||||
|
@ -163,15 +152,13 @@ class MessagingError(Exception):
|
||||||
|
|
||||||
def pack_error(
|
def pack_error(
|
||||||
exc: BaseException,
|
exc: BaseException,
|
||||||
tb: str|None = None,
|
tb: str | None = None,
|
||||||
cid: str|None = None,
|
|
||||||
|
|
||||||
) -> dict[str, dict]:
|
) -> dict[str, dict]:
|
||||||
'''
|
'''
|
||||||
Create an "error message" which boxes a locally caught
|
Create an "error message" encoded for wire transport via an IPC
|
||||||
exception's meta-data and encodes it for wire transport via an
|
`Channel`; expected to be unpacked on the receiver side using
|
||||||
IPC `Channel`; expected to be unpacked (and thus unboxed) on
|
`unpack_error()` below.
|
||||||
the receiver side using `unpack_error()` below.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if tb:
|
if tb:
|
||||||
|
@ -188,33 +175,19 @@ def pack_error(
|
||||||
'src_actor_uid': current_actor().uid,
|
'src_actor_uid': current_actor().uid,
|
||||||
}
|
}
|
||||||
|
|
||||||
# TODO: ?just wholesale proxy `.msgdata: dict`?
|
if isinstance(exc, ContextCancelled):
|
||||||
# XXX WARNING, when i swapped these ctx-semantics
|
|
||||||
# tests started hanging..???!!!???
|
|
||||||
# if msgdata := exc.getattr('msgdata', {}):
|
|
||||||
# error_msg.update(msgdata)
|
|
||||||
if (
|
|
||||||
isinstance(exc, ContextCancelled)
|
|
||||||
or isinstance(exc, StreamOverrun)
|
|
||||||
):
|
|
||||||
error_msg.update(exc.msgdata)
|
error_msg.update(exc.msgdata)
|
||||||
|
|
||||||
|
return {'error': error_msg}
|
||||||
pkt: dict = {'error': error_msg}
|
|
||||||
if cid:
|
|
||||||
pkt['cid'] = cid
|
|
||||||
|
|
||||||
return pkt
|
|
||||||
|
|
||||||
|
|
||||||
def unpack_error(
|
def unpack_error(
|
||||||
|
|
||||||
msg: dict[str, Any],
|
msg: dict[str, Any],
|
||||||
chan=None,
|
chan=None,
|
||||||
err_type=RemoteActorError,
|
err_type=RemoteActorError
|
||||||
hide_tb: bool = True,
|
|
||||||
|
|
||||||
) -> None|Exception:
|
) -> None | Exception:
|
||||||
'''
|
'''
|
||||||
Unpack an 'error' message from the wire
|
Unpack an 'error' message from the wire
|
||||||
into a local `RemoteActorError` (subtype).
|
into a local `RemoteActorError` (subtype).
|
||||||
|
@ -223,7 +196,7 @@ def unpack_error(
|
||||||
which is the responsibilitiy of the caller.
|
which is the responsibilitiy of the caller.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = True
|
||||||
|
|
||||||
error_dict: dict[str, dict] | None
|
error_dict: dict[str, dict] | None
|
||||||
if (
|
if (
|
||||||
|
@ -336,11 +309,6 @@ def _raise_from_no_key_in_msg(
|
||||||
# value out of the underlying feed mem chan!
|
# value out of the underlying feed mem chan!
|
||||||
stream._eoc: bool = True
|
stream._eoc: bool = True
|
||||||
|
|
||||||
# TODO: if the a local task is already blocking on
|
|
||||||
# a `Context.result()` and thus a `.receive()` on the
|
|
||||||
# rx-chan, we close the chan and set state ensuring that
|
|
||||||
# an eoc is raised!
|
|
||||||
|
|
||||||
# # when the send is closed we assume the stream has
|
# # when the send is closed we assume the stream has
|
||||||
# # terminated and signal this local iterator to stop
|
# # terminated and signal this local iterator to stop
|
||||||
# await stream.aclose()
|
# await stream.aclose()
|
||||||
|
@ -349,8 +317,8 @@ def _raise_from_no_key_in_msg(
|
||||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||||
# block below it will trigger ``.aclose()``.
|
# block below it will trigger ``.aclose()``.
|
||||||
raise trio.EndOfChannel(
|
raise trio.EndOfChannel(
|
||||||
f'Context stream ended due to msg:\n'
|
'Context[{cid}] stream ended due to msg:\n'
|
||||||
f'{pformat(msg)}'
|
f'{pformat(msg)}'
|
||||||
) from src_err
|
) from src_err
|
||||||
|
|
||||||
|
|
||||||
|
@ -365,7 +333,8 @@ def _raise_from_no_key_in_msg(
|
||||||
# is activated above.
|
# is activated above.
|
||||||
_type: str = 'Stream' if stream else 'Context'
|
_type: str = 'Stream' if stream else 'Context'
|
||||||
raise MessagingError(
|
raise MessagingError(
|
||||||
f"{_type} was expecting a '{expect_key}' message"
|
f'{_type} was expecting a `{expect_key}` message'
|
||||||
" BUT received a non-error msg:\n"
|
' BUT received a non-`error` msg:\n'
|
||||||
f'{pformat(msg)}'
|
f'cid: {cid}\n'
|
||||||
|
'{pformat(msg)}'
|
||||||
) from src_err
|
) from src_err
|
||||||
|
|
Loading…
Reference in New Issue