forked from goodboy/tractor
1
0
Fork 0
tractor/tractor/_exceptions.py

122 lines
2.9 KiB
Python
Raw Normal View History

"""
Our classy exception set.
"""
2021-06-14 00:19:52 +00:00
from typing import Dict, Any, Optional, Type
import importlib
import builtins
import traceback
import trio
_this_mod = importlib.import_module(__name__)
class RemoteActorError(Exception):
# TODO: local recontruction of remote exception deats
"Remote actor exception bundled locally"
def __init__(
self,
message: str,
2021-06-14 00:19:52 +00:00
suberror_type: Optional[Type[BaseException]] = None,
**msgdata
) -> None:
super().__init__(message)
self.type = suberror_type
self.msgdata = msgdata
# TODO: a trio.MultiError.catch like context manager
# for catching underlying remote errors of a particular type
class InternalActorError(RemoteActorError):
"""Remote internal ``tractor`` error indicating
failure of some primitive or machinery.
"""
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
class ContextCancelled(RemoteActorError):
"Inter-actor task context cancelled itself on the callee side."
class NoResult(RuntimeError):
"No final result is expected for this actor"
2019-01-12 22:55:28 +00:00
class ModuleNotExposed(ModuleNotFoundError):
"The requested module is not exposed for RPC"
2021-04-27 15:07:53 +00:00
class NoRuntime(RuntimeError):
"The root actor has not been initialized yet"
def pack_error(exc: BaseException) -> Dict[str, Any]:
"""Create an "error message" for tranmission over
a channel (aka the wire).
"""
return {
'error': {
'tb_str': traceback.format_exc(),
'type_str': type(exc).__name__,
}
}
def unpack_error(
msg: Dict[str, Any],
chan=None,
err_type=RemoteActorError
) -> Exception:
"""Unpack an 'error' message from the wire
into a local ``RemoteActorError``.
"""
error = msg['error']
tb_str = error.get('tb_str', '')
message = f"{chan.uid}\n" + tb_str
type_name = error['type_str']
2021-06-14 00:19:52 +00:00
suberror_type: Type[BaseException] = Exception
if type_name == 'ContextCancelled':
err_type = ContextCancelled
suberror_type = trio.Cancelled
else: # try to lookup a suitable local error type
for ns in [builtins, _this_mod, trio]:
try:
suberror_type = getattr(ns, type_name)
break
except AttributeError:
continue
exc = err_type(
message,
suberror_type=suberror_type,
# unpack other fields into error type init
**msg['error'],
)
return exc
def is_multi_cancelled(exc: BaseException) -> bool:
"""Predicate to determine if a ``trio.MultiError`` contains only
``trio.Cancelled`` sub-exceptions (and is likely the result of
cancelling a collection of subtasks.
"""
return not trio.MultiError.filter(
lambda exc: exc if not isinstance(exc, trio.Cancelled) else None,
exc,
)