forked from goodboy/tractor
Add `StreamOverrun.sender: tuple` for better handling
Since it's generally useful to know who is the cause of an overrun (say bc you want your system to then adjust the writer side to slow tf down) might as well pack an extra `.sender: tuple[str, str]` actor uid field which can be relayed through `RemoteActorError` boxing. Add an extra case for the exc-type to `unpack_error()` to match B)modden_spawn_from_client_req
parent
286e75d342
commit
7fbada8a15
|
@ -134,8 +134,19 @@ class NoRuntime(RuntimeError):
|
||||||
"The root actor has not been initialized yet"
|
"The root actor has not been initialized yet"
|
||||||
|
|
||||||
|
|
||||||
class StreamOverrun(trio.TooSlowError):
|
class StreamOverrun(
|
||||||
"This stream was overrun by sender"
|
RemoteActorError,
|
||||||
|
trio.TooSlowError,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
This stream was overrun by sender
|
||||||
|
|
||||||
|
'''
|
||||||
|
@property
|
||||||
|
def sender(self) -> tuple[str, str] | None:
|
||||||
|
value = self.msgdata.get('sender')
|
||||||
|
if value:
|
||||||
|
return tuple(value)
|
||||||
|
|
||||||
|
|
||||||
class AsyncioCancelled(Exception):
|
class AsyncioCancelled(Exception):
|
||||||
|
@ -175,7 +186,15 @@ def pack_error(
|
||||||
'src_actor_uid': current_actor().uid,
|
'src_actor_uid': current_actor().uid,
|
||||||
}
|
}
|
||||||
|
|
||||||
if isinstance(exc, ContextCancelled):
|
# TODO: ?just wholesale proxy `.msgdata: dict`?
|
||||||
|
# XXX WARNING, when i swapped these ctx-semantics
|
||||||
|
# tests started hanging..???!!!???
|
||||||
|
# if msgdata := exc.getattr('msgdata', {}):
|
||||||
|
# error_msg.update(msgdata)
|
||||||
|
if (
|
||||||
|
isinstance(exc, ContextCancelled)
|
||||||
|
or isinstance(exc, StreamOverrun)
|
||||||
|
):
|
||||||
error_msg.update(exc.msgdata)
|
error_msg.update(exc.msgdata)
|
||||||
|
|
||||||
return {'error': error_msg}
|
return {'error': error_msg}
|
||||||
|
|
Loading…
Reference in New Issue