commit
f427c98cf6
|
@ -26,6 +26,11 @@ async def main():
|
||||||
├─ python -m tractor._child --uid ('name_error', 'a7caf490 ...)
|
├─ python -m tractor._child --uid ('name_error', 'a7caf490 ...)
|
||||||
`-python -m tractor._child --uid ('spawn_error', '52ee14a5 ...)
|
`-python -m tractor._child --uid ('spawn_error', '52ee14a5 ...)
|
||||||
`-python -m tractor._child --uid ('name_error', '3391222c ...)
|
`-python -m tractor._child --uid ('name_error', '3391222c ...)
|
||||||
|
|
||||||
|
Order of failure:
|
||||||
|
- nested name_error sub-sub-actor
|
||||||
|
- root actor should then fail on assert
|
||||||
|
- program termination
|
||||||
"""
|
"""
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
|
|
||||||
|
|
|
@ -343,11 +343,14 @@ def test_multi_subactors_root_errors(spawn):
|
||||||
|
|
||||||
# should now get attached in root with assert error
|
# should now get attached in root with assert error
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
|
||||||
# should have come just after priot prompt
|
# should have come just after priot prompt
|
||||||
assert "Cancelling nursery in ('spawn_error'," in before
|
|
||||||
assert "Attaching to pdb in crashed actor: ('arbiter'" in before
|
assert "Attaching to pdb in crashed actor: ('arbiter'" in before
|
||||||
assert "AssertionError" in before
|
assert "AssertionError" in before
|
||||||
|
|
||||||
|
# warnings assert we probably don't need
|
||||||
|
# assert "Cancelling nursery in ('spawn_error'," in before
|
||||||
|
|
||||||
# continue again
|
# continue again
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
|
@ -369,6 +372,9 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
|
||||||
|
|
||||||
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
||||||
|
|
||||||
|
# startup time can be iffy
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
for i in range(12):
|
for i in range(12):
|
||||||
try:
|
try:
|
||||||
child.expect(r"\(Pdb\+\+\)")
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
|
@ -25,7 +25,8 @@ from .log import get_logger
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
pack_error,
|
pack_error,
|
||||||
unpack_error,
|
unpack_error,
|
||||||
ModuleNotExposed
|
ModuleNotExposed,
|
||||||
|
is_multi_cancelled,
|
||||||
)
|
)
|
||||||
from . import _debug
|
from . import _debug
|
||||||
from ._discovery import get_arbiter
|
from ._discovery import get_arbiter
|
||||||
|
@ -129,14 +130,19 @@ async def _invoke(
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
|
|
||||||
if not isinstance(err, trio.ClosedResourceError):
|
# TODO: maybe we'll want differnet "levels" of debugging
|
||||||
log.exception("Actor crashed:")
|
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
||||||
|
if not isinstance(err, trio.ClosedResourceError) and (
|
||||||
|
not is_multi_cancelled(err)
|
||||||
|
):
|
||||||
# XXX: is there any case where we'll want to debug IPC
|
# XXX: is there any case where we'll want to debug IPC
|
||||||
# disconnects? I can't think of a reason that inspecting
|
# disconnects? I can't think of a reason that inspecting
|
||||||
# this type of failure will be useful for respawns or
|
# this type of failure will be useful for respawns or
|
||||||
# recovery logic - the only case is some kind of strange bug
|
# recovery logic - the only case is some kind of strange bug
|
||||||
# in `trio` itself?
|
# in `trio` itself?
|
||||||
await _debug._maybe_enter_pm(err)
|
entered = await _debug._maybe_enter_pm(err)
|
||||||
|
if not entered:
|
||||||
|
log.exception("Actor crashed:")
|
||||||
|
|
||||||
# always ship errors back to caller
|
# always ship errors back to caller
|
||||||
err_msg = pack_error(err)
|
err_msg = pack_error(err)
|
||||||
|
@ -144,7 +150,7 @@ async def _invoke(
|
||||||
try:
|
try:
|
||||||
await chan.send(err_msg)
|
await chan.send(err_msg)
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
log.exception(
|
log.warning(
|
||||||
f"Failed to ship error to caller @ {chan.uid}")
|
f"Failed to ship error to caller @ {chan.uid}")
|
||||||
if cs is None:
|
if cs is None:
|
||||||
# error is from above code not from rpc invocation
|
# error is from above code not from rpc invocation
|
||||||
|
|
|
@ -15,6 +15,7 @@ from .log import get_logger
|
||||||
from . import _state
|
from . import _state
|
||||||
from ._discovery import get_root
|
from ._discovery import get_root
|
||||||
from ._state import is_root_process
|
from ._state import is_root_process
|
||||||
|
from ._exceptions import is_multi_cancelled
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# wtf: only exported when installed in dev mode?
|
# wtf: only exported when installed in dev mode?
|
||||||
|
@ -121,13 +122,16 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
|
||||||
"""
|
"""
|
||||||
task_name = trio.lowlevel.current_task()
|
task_name = trio.lowlevel.current_task()
|
||||||
try:
|
try:
|
||||||
log.error(f"TTY BEING ACQUIRED by {task_name}:{uid}")
|
log.debug(
|
||||||
|
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}")
|
||||||
await _debug_lock.acquire()
|
await _debug_lock.acquire()
|
||||||
log.error(f"TTY lock acquired by {task_name}:{uid}")
|
|
||||||
|
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
|
||||||
yield
|
yield
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
_debug_lock.release()
|
_debug_lock.release()
|
||||||
log.error(f"TTY lock released by {task_name}:{uid}")
|
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
|
||||||
|
|
||||||
|
|
||||||
# @contextmanager
|
# @contextmanager
|
||||||
|
@ -288,7 +292,7 @@ breakpoint = partial(
|
||||||
|
|
||||||
|
|
||||||
def _post_mortem(actor):
|
def _post_mortem(actor):
|
||||||
log.critical(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
|
log.runtime(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
|
||||||
pdb = _mk_pdb()
|
pdb = _mk_pdb()
|
||||||
|
|
||||||
# custom Pdb post-mortem entry
|
# custom Pdb post-mortem entry
|
||||||
|
@ -318,10 +322,11 @@ async def _maybe_enter_pm(err):
|
||||||
|
|
||||||
# Really we just want to mostly avoid catching KBIs here so there
|
# Really we just want to mostly avoid catching KBIs here so there
|
||||||
# might be a simpler check we can do?
|
# might be a simpler check we can do?
|
||||||
and trio.MultiError.filter(
|
and not is_multi_cancelled(err)
|
||||||
lambda exc: exc if not isinstance(exc, trio.Cancelled) else None,
|
|
||||||
err,
|
|
||||||
)
|
|
||||||
):
|
):
|
||||||
log.warning("Actor crashed, entering debug mode")
|
log.debug("Actor crashed, entering debug mode")
|
||||||
await post_mortem()
|
await post_mortem()
|
||||||
|
return True
|
||||||
|
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
"""
|
"""
|
||||||
Our classy exception set.
|
Our classy exception set.
|
||||||
"""
|
"""
|
||||||
|
from typing import Dict, Any
|
||||||
import importlib
|
import importlib
|
||||||
import builtins
|
import builtins
|
||||||
import traceback
|
import traceback
|
||||||
|
@ -14,7 +15,7 @@ _this_mod = importlib.import_module(__name__)
|
||||||
class RemoteActorError(Exception):
|
class RemoteActorError(Exception):
|
||||||
# TODO: local recontruction of remote exception deats
|
# TODO: local recontruction of remote exception deats
|
||||||
"Remote actor exception bundled locally"
|
"Remote actor exception bundled locally"
|
||||||
def __init__(self, message, type_str, **msgdata):
|
def __init__(self, message, type_str, **msgdata) -> None:
|
||||||
super().__init__(message)
|
super().__init__(message)
|
||||||
for ns in [builtins, _this_mod, trio]:
|
for ns in [builtins, _this_mod, trio]:
|
||||||
try:
|
try:
|
||||||
|
@ -45,7 +46,7 @@ class ModuleNotExposed(ModuleNotFoundError):
|
||||||
"The requested module is not exposed for RPC"
|
"The requested module is not exposed for RPC"
|
||||||
|
|
||||||
|
|
||||||
def pack_error(exc):
|
def pack_error(exc: BaseException) -> Dict[str, Any]:
|
||||||
"""Create an "error message" for tranmission over
|
"""Create an "error message" for tranmission over
|
||||||
a channel (aka the wire).
|
a channel (aka the wire).
|
||||||
"""
|
"""
|
||||||
|
@ -57,7 +58,11 @@ def pack_error(exc):
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def unpack_error(msg, chan=None, err_type=RemoteActorError):
|
def unpack_error(
|
||||||
|
msg: Dict[str, Any],
|
||||||
|
chan=None,
|
||||||
|
err_type=RemoteActorError
|
||||||
|
) -> Exception:
|
||||||
"""Unpack an 'error' message from the wire
|
"""Unpack an 'error' message from the wire
|
||||||
into a local ``RemoteActorError``.
|
into a local ``RemoteActorError``.
|
||||||
"""
|
"""
|
||||||
|
@ -66,3 +71,15 @@ def unpack_error(msg, chan=None, err_type=RemoteActorError):
|
||||||
f"{chan.uid}\n" + tb_str,
|
f"{chan.uid}\n" + tb_str,
|
||||||
**msg['error'],
|
**msg['error'],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def is_multi_cancelled(exc: BaseException) -> bool:
|
||||||
|
"""Predicate to determine if a ``trio.MultiError`` contains only
|
||||||
|
``trio.Cancelled`` sub-exceptions (and is likely the result of
|
||||||
|
cancelling a collection of subtasks.
|
||||||
|
|
||||||
|
"""
|
||||||
|
return not trio.MultiError.filter(
|
||||||
|
lambda exc: exc if not isinstance(exc, trio.Cancelled) else None,
|
||||||
|
exc,
|
||||||
|
)
|
||||||
|
|
|
@ -51,7 +51,7 @@ class MsgpackStream:
|
||||||
data = await self.stream.receive_some(2**10)
|
data = await self.stream.receive_some(2**10)
|
||||||
log.trace(f"received {data}") # type: ignore
|
log.trace(f"received {data}") # type: ignore
|
||||||
except trio.BrokenResourceError:
|
except trio.BrokenResourceError:
|
||||||
log.error(f"Stream connection {self.raddr} broke")
|
log.warning(f"Stream connection {self.raddr} broke")
|
||||||
return
|
return
|
||||||
|
|
||||||
if data == b'':
|
if data == b'':
|
||||||
|
|
|
@ -13,6 +13,7 @@ from ._state import current_actor
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._actor import Actor
|
from ._actor import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
|
from ._exceptions import is_multi_cancelled
|
||||||
from . import _state
|
from . import _state
|
||||||
from . import _spawn
|
from . import _spawn
|
||||||
|
|
||||||
|
@ -246,7 +247,9 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
|
||||||
# For now, shield both.
|
# For now, shield both.
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
etype = type(err)
|
etype = type(err)
|
||||||
if etype in (trio.Cancelled, KeyboardInterrupt):
|
if etype in (trio.Cancelled, KeyboardInterrupt) or (
|
||||||
|
is_multi_cancelled(err)
|
||||||
|
):
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Nursery for {current_actor().uid} was "
|
f"Nursery for {current_actor().uid} was "
|
||||||
f"cancelled with {etype}")
|
f"cancelled with {etype}")
|
||||||
|
|
|
@ -11,7 +11,7 @@ from ._state import ActorContextInfo
|
||||||
|
|
||||||
|
|
||||||
_proj_name = 'tractor'
|
_proj_name = 'tractor'
|
||||||
_default_loglevel = None
|
_default_loglevel = 'ERROR'
|
||||||
|
|
||||||
# Super sexy formatting thanks to ``colorlog``.
|
# Super sexy formatting thanks to ``colorlog``.
|
||||||
# (NOTE: we use the '{' format style)
|
# (NOTE: we use the '{' format style)
|
||||||
|
@ -31,11 +31,13 @@ LEVELS = {
|
||||||
'GARBAGE': 1,
|
'GARBAGE': 1,
|
||||||
'TRACE': 5,
|
'TRACE': 5,
|
||||||
'PROFILE': 15,
|
'PROFILE': 15,
|
||||||
|
'RUNTIME': 500,
|
||||||
'QUIET': 1000,
|
'QUIET': 1000,
|
||||||
}
|
}
|
||||||
STD_PALETTE = {
|
STD_PALETTE = {
|
||||||
'CRITICAL': 'red',
|
'CRITICAL': 'red',
|
||||||
'ERROR': 'red',
|
'ERROR': 'red',
|
||||||
|
'RUNTIME': 'white',
|
||||||
'WARNING': 'yellow',
|
'WARNING': 'yellow',
|
||||||
'INFO': 'green',
|
'INFO': 'green',
|
||||||
'DEBUG': 'white',
|
'DEBUG': 'white',
|
||||||
|
|
Loading…
Reference in New Issue