forked from goodboy/tractor
1
0
Fork 0

Initial attempt at multi-actor debugging

Allow entering and attaching to a `pdb` instance in a child process.
The current hackery is to have the child make an rpc to the parent and
ask it to hijack stdin, once complete the child enters a `pdb` blocking
method. The parent then relays all stdin input to the child thus
controlling the "remote" debugger.

A few things were added to accomplish this:
- tracking the mapping of subactors to their parent nurseries
- in the root actor, cancelling all nurseries under the root `trio` task
  on cancellation (i.e. `Actor.cancel()`)
- pass a "runtime vars" map down the actor tree for propagating global state
debug_tests
Tyler Goodlet 2020-07-23 13:23:55 -04:00
parent 8c97f7bbb3
commit b11e91375c
5 changed files with 164 additions and 10 deletions

View File

@ -7,6 +7,7 @@ from itertools import chain
import importlib import importlib
import importlib.util import importlib.util
import inspect import inspect
import bdb
import uuid import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional from typing import Dict, List, Tuple, Any, Optional
@ -125,8 +126,15 @@ async def _invoke(
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# always ship errors back to caller
log.exception("Actor errored:") log.exception("Actor errored:")
# NOTE: don't enter debug mode recursively after quitting pdb
if _state.debug_mode() and not isinstance(err, bdb.BdbQuit):
# Allow for pdb control in parent
from ._debug import post_mortem
await post_mortem()
# always ship errors back to caller
err_msg = pack_error(err) err_msg = pack_error(err)
err_msg['cid'] = cid err_msg['cid'] = cid
try: try:
@ -178,6 +186,7 @@ class Actor:
def __init__( def __init__(
self, self,
name: str, name: str,
*,
rpc_module_paths: List[str] = [], rpc_module_paths: List[str] = [],
statespace: Optional[Dict[str, Any]] = None, statespace: Optional[Dict[str, Any]] = None,
uid: str = None, uid: str = None,
@ -237,6 +246,7 @@ class Actor:
self._parent_chan: Optional[Channel] = None self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[ self._forkserver_info: Optional[
Tuple[Any, Any, Any, Any, Any]] = None Tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {}
async def wait_for_peer( async def wait_for_peer(
self, uid: Tuple[str, str] self, uid: Tuple[str, str]

134
tractor/_debug.py 100644
View File

@ -0,0 +1,134 @@
"""
Multi-core debugging for da peeps!
"""
import pdb
import sys
import tty
from functools import partial
from typing import Awaitable, Tuple
from async_generator import aclosing
import tractor
import trio
from .log import get_logger
log = get_logger(__name__)
__all__ = ['breakpoint', 'post_mortem']
# TODO: is there some way to determine this programatically?
_pdb_exit_patterns = tuple(
str.encode(patt + "\n") for patt in ('c', 'cont', 'continue', 'q', 'quit')
)
def subactoruid2proc(
actor: 'Actor', # noqa
uid: Tuple[str, str]
) -> trio.Process:
n = actor._actoruid2nursery[uid]
_, proc, _ = n._children[uid]
return proc
async def _hijack_stdin_relay_to_child(
subactor_uid: Tuple[str, str]
) -> None:
actor = tractor.current_actor()
proc = subactoruid2proc(actor, subactor_uid)
# nlb = []
async def hijack_stdin():
log.info(f"Hijacking stdin from {actor.uid}")
# try:
# # disable cooked mode
# fd = sys.stdin.fileno()
# old = tty.tcgetattr(fd)
# tty.setcbreak(fd)
# trap std in and relay to subproc
async_stdin = trio.wrap_file(sys.stdin)
async with aclosing(async_stdin):
# while True:
async for msg in async_stdin:
log.trace(f"Stdin input:\n{msg}")
# nlb.append(msg)
# encode to bytes
bmsg = str.encode(msg)
# relay bytes to subproc over pipe
await proc.stdin.send_all(bmsg)
# line = str.encode(''.join(nlb))
# print(line)
if bmsg in _pdb_exit_patterns:
log.info("Closing stdin hijack")
break
# finally:
# tty.tcsetattr(fd, tty.TCSAFLUSH, old)
# schedule hijacking in root scope
actor._root_nursery.start_soon(hijack_stdin)
# XXX: We only make this sync in case someone wants to
# overload the ``breakpoint()`` built-in.
def _breakpoint(debug_func) -> Awaitable[None]:
"""``tractor`` breakpoint entry for engaging pdb machinery
in subactors.
"""
actor = tractor.current_actor()
async def wait_for_parent_stdin_hijack():
log.debug('Breakpoint engaged!')
# TODO: need a more robust check for the "root" actor
if actor._parent_chan:
async with tractor._portal.open_portal(
actor._parent_chan,
start_msg_loop=False,
) as portal:
# with trio.fail_after(1):
await portal.run(
'tractor._debug',
'_hijack_stdin_relay_to_child',
subactor_uid=actor.uid,
)
# block here one frame up where ``breakpoint()``
# was awaited and begin handling stdin
debug_func(actor)
# this must be awaited by caller
return wait_for_parent_stdin_hijack()
def _set_trace(actor):
pdb.set_trace(
header=f"\nAttaching pdb to actor: {actor.uid}\n",
# start 2 levels up
frame=sys._getframe().f_back.f_back,
)
breakpoint = partial(
_breakpoint,
_set_trace,
)
def _post_mortem(actor):
log.error(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
pdb.post_mortem()
post_mortem = partial(
_breakpoint,
_post_mortem,
)

View File

@ -57,6 +57,9 @@ def _trio_main(
) -> None: ) -> None:
"""Entry point for a `trio_run_in_process` subactor. """Entry point for a `trio_run_in_process` subactor.
""" """
# TODO: make a global func to set this or is it too hacky?
# os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint'
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f"Setting loglevel for {actor.uid} to {actor.loglevel}")

View File

@ -23,7 +23,7 @@ from multiprocessing import forkserver # type: ignore
from typing import Tuple from typing import Tuple
from . import _forkserver_override from . import _forkserver_override
from ._state import current_actor from ._state import current_actor, is_main_process
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._actor import Actor, ActorFailure from ._actor import Actor, ActorFailure
@ -87,12 +87,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
return _ctx return _ctx
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'
async def exhaust_portal( async def exhaust_portal(
portal: Portal, portal: Portal,
actor: Actor actor: Actor
@ -206,6 +200,8 @@ async def new_proc(
# passed through to actor main # passed through to actor main
bind_addr: Tuple[str, int], bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False, use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
@ -241,9 +237,14 @@ async def new_proc(
"statespace": subactor.statespace, "statespace": subactor.statespace,
"_arb_addr": subactor._arb_addr, "_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0], "bind_host": bind_addr[0],
"bind_port": bind_addr[1] "bind_port": bind_addr[1],
"_runtime_vars": _runtime_vars,
}) })
# track subactor in current nursery
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# resume caller at next checkpoint now that child is up # resume caller at next checkpoint now that child is up
task_status.started(portal) task_status.started(portal)

View File

@ -13,10 +13,11 @@ from ._state import current_actor
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor from ._actor import Actor
from ._portal import Portal from ._portal import Portal
from . import _state
from . import _spawn from . import _spawn
log = get_logger('tractor') log = get_logger(__name__)
_default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0) _default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0)
@ -58,6 +59,10 @@ class ActorNursery:
) -> Portal: ) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel() loglevel = loglevel or self._actor.loglevel or get_loglevel()
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
subactor = Actor( subactor = Actor(
name, name,
# modules allowed to invoked funcs from # modules allowed to invoked funcs from
@ -83,6 +88,7 @@ class ActorNursery:
self.errors, self.errors,
bind_addr, bind_addr,
parent_addr, parent_addr,
_rtv, # run time vars
) )
) )