Add a no runtime error

stream_contexts
Tyler Goodlet 2021-04-27 11:07:53 -04:00
parent 7f38b7225d
commit 897ab79946
2 changed files with 8 additions and 1 deletions

View File

@ -46,6 +46,10 @@ class ModuleNotExposed(ModuleNotFoundError):
"The requested module is not exposed for RPC" "The requested module is not exposed for RPC"
class NoRuntime(RuntimeError):
"The root actor has not been initialized yet"
def pack_error(exc: BaseException) -> Dict[str, Any]: def pack_error(exc: BaseException) -> Dict[str, Any]:
"""Create an "error message" for tranmission over """Create an "error message" for tranmission over
a channel (aka the wire). a channel (aka the wire).

View File

@ -7,6 +7,9 @@ import multiprocessing as mp
import trio import trio
from ._exceptions import NoRuntime
_current_actor: Optional['Actor'] = None # type: ignore # noqa _current_actor: Optional['Actor'] = None # type: ignore # noqa
_runtime_vars: Dict[str, Any] = { _runtime_vars: Dict[str, Any] = {
'_debug_mode': False, '_debug_mode': False,
@ -19,7 +22,7 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore #
"""Get the process-local actor instance. """Get the process-local actor instance.
""" """
if _current_actor is None and err_on_no_runtime: if _current_actor is None and err_on_no_runtime:
raise RuntimeError("No local actor has been initialized yet") raise NoRuntime("No local actor has been initialized yet")
return _current_actor return _current_actor