Add `StreamOverrun.sender: tuple` for better handling

Since it's generally useful to know who is the cause of an overrun (say
bc you want your system to then adjust the writer side to slow tf down)
might as well pack an extra `.sender: tuple[str, str]` actor uid field
which can be relayed through `RemoteActorError` boxing. Add an extra
case for the exc-type to `unpack_error()` to match B)
ctx_cancel_semantics_and_overruns_REVERSED_FACEPALM
Tyler Goodlet 2024-02-16 15:23:00 -05:00
parent a86275996c
commit 9fc9b10b53
1 changed files with 22 additions and 3 deletions

View File

@ -134,8 +134,19 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet"
class StreamOverrun(trio.TooSlowError):
"This stream was overrun by sender"
class StreamOverrun(
RemoteActorError,
trio.TooSlowError,
):
'''
This stream was overrun by sender
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self.msgdata.get('sender')
if value:
return tuple(value)
class AsyncioCancelled(Exception):
@ -175,7 +186,15 @@ def pack_error(
'src_actor_uid': current_actor().uid,
}
if isinstance(exc, ContextCancelled):
# TODO: ?just wholesale proxy `.msgdata: dict`?
# XXX WARNING, when i swapped these ctx-semantics
# tests started hanging..???!!!???
# if msgdata := exc.getattr('msgdata', {}):
# error_msg.update(msgdata)
if (
isinstance(exc, ContextCancelled)
or isinstance(exc, StreamOverrun)
):
error_msg.update(exc.msgdata)
return {'error': error_msg}