forked from goodboy/tractor
Support entering post mortem on crashes in root actor
parent
291ecec070
commit
150179bfe4
|
@ -1,8 +1,10 @@
|
||||||
"""
|
"""
|
||||||
Multi-core debugging for da peeps!
|
Multi-core debugging for da peeps!
|
||||||
"""
|
"""
|
||||||
|
import bdb
|
||||||
import sys
|
import sys
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
from contextlib import asynccontextmanager, AsyncExitStack
|
||||||
from typing import Awaitable, Tuple, Optional, Callable
|
from typing import Awaitable, Tuple, Optional, Callable
|
||||||
|
|
||||||
from async_generator import aclosing
|
from async_generator import aclosing
|
||||||
|
@ -10,6 +12,7 @@ import tractor
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
|
from . import _state
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# wtf: only exported when installed in dev mode?
|
# wtf: only exported when installed in dev mode?
|
||||||
|
@ -92,37 +95,40 @@ class PdbwTeardown(pdbpp.Pdb):
|
||||||
# if bmsg in _pdb_exit_patterns:
|
# if bmsg in _pdb_exit_patterns:
|
||||||
# log.info("Closing stdin hijack")
|
# log.info("Closing stdin hijack")
|
||||||
# break
|
# break
|
||||||
|
@asynccontextmanager
|
||||||
|
async def _acquire_debug_lock():
|
||||||
|
"""Acquire a actor local FIFO lock meant to mutex entry to a local
|
||||||
|
debugger entry point to avoid tty clobbering by multiple processes.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
actor = tractor.current_actor()
|
||||||
|
debug_lock = actor.statespace.setdefault(
|
||||||
|
'_debug_lock', trio.StrictFIFOLock()
|
||||||
|
)
|
||||||
|
await debug_lock.acquire()
|
||||||
|
log.error("TTY lock acquired")
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
if debug_lock.locked():
|
||||||
|
debug_lock.release()
|
||||||
|
log.error("TTY lock released")
|
||||||
|
|
||||||
|
|
||||||
async def _hijack_stdin_relay_to_child(
|
async def _hijack_stdin_relay_to_child(
|
||||||
subactor_uid: Tuple[str, str]
|
subactor_uid: Tuple[str, str]
|
||||||
) -> None:
|
) -> None:
|
||||||
actor = tractor.current_actor()
|
|
||||||
debug_lock = actor.statespace.setdefault(
|
|
||||||
'_debug_lock', trio.StrictFIFOLock()
|
|
||||||
)
|
|
||||||
|
|
||||||
log.debug(f"Actor {subactor_uid} is waiting on stdin hijack lock")
|
|
||||||
await debug_lock.acquire()
|
|
||||||
log.warning(f"Actor {subactor_uid} acquired stdin hijack lock")
|
|
||||||
|
|
||||||
# TODO: when we get to true remote debugging
|
# TODO: when we get to true remote debugging
|
||||||
# this will deliver stdin data
|
# this will deliver stdin data
|
||||||
try:
|
log.debug(f"Actor {subactor_uid} is waiting on stdin hijack lock")
|
||||||
|
async with _acquire_debug_lock():
|
||||||
|
log.warning(f"Actor {subactor_uid} acquired stdin hijack lock")
|
||||||
# indicate to child that we've locked stdio
|
# indicate to child that we've locked stdio
|
||||||
yield 'Locked'
|
yield 'Locked'
|
||||||
|
|
||||||
# wait for cancellation of stream by child
|
# wait for cancellation of stream by child
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
# TODO: for remote debugging schedule hijacking in root scope
|
log.debug(f"Actor {subactor_uid} released stdin hijack lock")
|
||||||
# (see above)
|
|
||||||
# actor._root_nursery.start_soon(hijack_stdin)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
if debug_lock.locked():
|
|
||||||
debug_lock.release()
|
|
||||||
log.debug(f"Actor {subactor_uid} released stdin hijack lock")
|
|
||||||
|
|
||||||
|
|
||||||
# XXX: We only make this sync in case someone wants to
|
# XXX: We only make this sync in case someone wants to
|
||||||
|
@ -137,34 +143,31 @@ def _breakpoint(debug_func) -> Awaitable[None]:
|
||||||
async def wait_for_parent_stdin_hijack(
|
async def wait_for_parent_stdin_hijack(
|
||||||
task_status=trio.TASK_STATUS_IGNORED
|
task_status=trio.TASK_STATUS_IGNORED
|
||||||
):
|
):
|
||||||
|
try:
|
||||||
# TODO: need a more robust check for the "root" actor
|
async with tractor._portal.open_portal(
|
||||||
if actor._parent_chan:
|
actor._parent_chan,
|
||||||
try:
|
start_msg_loop=False,
|
||||||
async with tractor._portal.open_portal(
|
shield=True,
|
||||||
actor._parent_chan,
|
) as portal:
|
||||||
start_msg_loop=False,
|
with trio.fail_after(1):
|
||||||
shield=True,
|
|
||||||
) as portal:
|
|
||||||
# with trio.fail_after(1):
|
|
||||||
agen = await portal.run(
|
agen = await portal.run(
|
||||||
'tractor._debug',
|
'tractor._debug',
|
||||||
'_hijack_stdin_relay_to_child',
|
'_hijack_stdin_relay_to_child',
|
||||||
subactor_uid=actor.uid,
|
subactor_uid=actor.uid,
|
||||||
)
|
)
|
||||||
async with aclosing(agen):
|
async with aclosing(agen):
|
||||||
async for val in agen:
|
async for val in agen:
|
||||||
assert val == 'Locked'
|
assert val == 'Locked'
|
||||||
task_status.started()
|
task_status.started()
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await do_unlock.wait()
|
await do_unlock.wait()
|
||||||
|
|
||||||
# trigger cancellation of remote stream
|
# trigger cancellation of remote stream
|
||||||
break
|
break
|
||||||
finally:
|
finally:
|
||||||
log.debug(f"Exiting debugger for actor {actor}")
|
log.debug(f"Exiting debugger for actor {actor}")
|
||||||
actor.statespace['_in_debug'] = False
|
actor.statespace['_in_debug'] = False
|
||||||
log.debug(f"Child {actor} released parent stdio lock")
|
log.debug(f"Child {actor} released parent stdio lock")
|
||||||
|
|
||||||
async def _bp():
|
async def _bp():
|
||||||
"""Async breakpoint which schedules a parent stdio lock, and once complete
|
"""Async breakpoint which schedules a parent stdio lock, and once complete
|
||||||
|
@ -182,10 +185,27 @@ def _breakpoint(debug_func) -> Awaitable[None]:
|
||||||
|
|
||||||
actor.statespace['_in_debug'] = True
|
actor.statespace['_in_debug'] = True
|
||||||
|
|
||||||
# this **must** be awaited by the caller and is done using the
|
# TODO: need a more robust check for the "root" actor
|
||||||
# root nursery so that the debugger can continue to run without
|
if actor._parent_chan:
|
||||||
# being restricted by the scope of a new task nursery.
|
# this **must** be awaited by the caller and is done using the
|
||||||
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
# root nursery so that the debugger can continue to run without
|
||||||
|
# being restricted by the scope of a new task nursery.
|
||||||
|
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
||||||
|
|
||||||
|
# block here one (at the appropriate frame *up* where
|
||||||
|
# ``breakpoint()`` was awaited and begin handling stdio
|
||||||
|
# debug_func(actor)
|
||||||
|
else:
|
||||||
|
# we also wait in the root-parent for any child that
|
||||||
|
# may have the tty locked prior
|
||||||
|
async def _lock(
|
||||||
|
task_status=trio.TASK_STATUS_IGNORED
|
||||||
|
):
|
||||||
|
async with _acquire_debug_lock():
|
||||||
|
task_status.started()
|
||||||
|
await do_unlock.wait()
|
||||||
|
|
||||||
|
await actor._service_n.start(_lock)
|
||||||
|
|
||||||
# block here one (at the appropriate frame *up* where
|
# block here one (at the appropriate frame *up* where
|
||||||
# ``breakpoint()`` was awaited and begin handling stdio
|
# ``breakpoint()`` was awaited and begin handling stdio
|
||||||
|
@ -218,3 +238,24 @@ post_mortem = partial(
|
||||||
_breakpoint,
|
_breakpoint,
|
||||||
_post_mortem,
|
_post_mortem,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _maybe_enter_pm(err):
|
||||||
|
if (
|
||||||
|
_state.debug_mode()
|
||||||
|
and not isinstance(err, bdb.BdbQuit)
|
||||||
|
|
||||||
|
# XXX: if the error is the likely result of runtime-wide
|
||||||
|
# cancellation, we don't want to enter the debugger since
|
||||||
|
# there's races between when the parent actor has killed all
|
||||||
|
# comms and when the child tries to contact said parent to
|
||||||
|
# acquire the tty lock.
|
||||||
|
# Really we just want to mostly avoid catching KBIs here so there
|
||||||
|
# might be a simpler check we can do?
|
||||||
|
and trio.MultiError.filter(
|
||||||
|
lambda exc: exc if not isinstance(exc, trio.Cancelled) else None,
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
):
|
||||||
|
log.warning("Actor crashed, entering debug mode")
|
||||||
|
await post_mortem()
|
||||||
|
|
Loading…
Reference in New Issue