forked from goodboy/tractor
1
0
Fork 0

Initial attempt at multi-actor debugging

Allow entering and attaching to a `pdb` instance in a child process.
The current hackery is to have the child make an rpc to the parent and
ask it to hijack stdin, once complete the child enters a `pdb` blocking
method. The parent then relays all stdin input to the child thus
controlling the "remote" debugger.

A few things were added to accomplish this:
- tracking the mapping of subactors to their parent nurseries
- in the root actor, cancelling all nurseries under the root `trio` task
  on cancellation (i.e. `Actor.cancel()`)
- pass a "runtime vars" map down the actor tree for propagating global state
stin_char_relay
Tyler Goodlet 2020-07-23 13:23:55 -04:00
parent 96cf4b6e9f
commit 23fee8820b
5 changed files with 174 additions and 11 deletions

View File

@ -7,6 +7,7 @@ from itertools import chain
import importlib import importlib
import importlib.util import importlib.util
import inspect import inspect
import bdb
import uuid import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional from typing import Dict, List, Tuple, Any, Optional
@ -125,8 +126,15 @@ async def _invoke(
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# always ship errors back to caller
log.exception("Actor errored:") log.exception("Actor errored:")
# NOTE: don't enter debug mode recursively after quitting pdb
if _state.debug_mode() and not isinstance(err, bdb.BdbQuit):
# Allow for pdb control in parent
from ._debug import post_mortem
await post_mortem()
# always ship errors back to caller
err_msg = pack_error(err) err_msg = pack_error(err)
err_msg['cid'] = cid err_msg['cid'] = cid
try: try:
@ -182,6 +190,7 @@ class Actor:
def __init__( def __init__(
self, self,
name: str, name: str,
*,
rpc_module_paths: List[str] = [], rpc_module_paths: List[str] = [],
statespace: Optional[Dict[str, Any]] = None, statespace: Optional[Dict[str, Any]] = None,
uid: str = None, uid: str = None,
@ -235,6 +244,7 @@ class Actor:
self._parent_chan: Optional[Channel] = None self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[ self._forkserver_info: Optional[
Tuple[Any, Any, Any, Any, Any]] = None Tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {}
async def wait_for_peer( async def wait_for_peer(
self, uid: Tuple[str, str] self, uid: Tuple[str, str]
@ -709,11 +719,17 @@ class Actor:
- cancelling all rpc tasks - cancelling all rpc tasks
- cancelling the channel server - cancelling the channel server
- cancel the "root" nursery - cancel the "root" nursery
- if root actor, cancel all nurseries under the root ``trio`` task
""" """
# cancel all ongoing rpc tasks # cancel all ongoing rpc tasks
await self.cancel_rpc_tasks() await self.cancel_rpc_tasks()
self.cancel_server() self.cancel_server()
self._root_nursery.cancel_scope.cancel() self._root_nursery.cancel_scope.cancel()
if self._parent_chan is None: # TODO: more robust check
# we're the root actor or zombied
root = trio.lowlevel.current_root_task()
for n in root.child_nurseries:
n.cancel_scope.cancel()
async def _cancel_task(self, cid, chan): async def _cancel_task(self, cid, chan):
"""Cancel a local task by call-id / channel. """Cancel a local task by call-id / channel.

134
tractor/_debug.py 100644
View File

@ -0,0 +1,134 @@
"""
Multi-core debugging for da peeps!
"""
import pdb
import sys
import tty
from functools import partial
from typing import Awaitable, Tuple
from async_generator import aclosing
import tractor
import trio
from .log import get_logger
log = get_logger(__name__)
__all__ = ['breakpoint', 'post_mortem']
# TODO: is there some way to determine this programatically?
_pdb_exit_patterns = tuple(
str.encode(patt + "\n") for patt in ('c', 'cont', 'continue', 'q', 'quit')
)
def subactoruid2proc(
actor: 'Actor', # noqa
uid: Tuple[str, str]
) -> trio.Process:
n = actor._actoruid2nursery[uid]
_, proc, _ = n._children[uid]
return proc
async def _hijack_stdin_relay_to_child(
subactor_uid: Tuple[str, str]
) -> None:
actor = tractor.current_actor()
proc = subactoruid2proc(actor, subactor_uid)
# nlb = []
async def hijack_stdin():
log.info(f"Hijacking stdin from {actor.uid}")
# try:
# # disable cooked mode
# fd = sys.stdin.fileno()
# old = tty.tcgetattr(fd)
# tty.setcbreak(fd)
# trap std in and relay to subproc
async_stdin = trio.wrap_file(sys.stdin)
async with aclosing(async_stdin):
# while True:
async for msg in async_stdin:
log.trace(f"Stdin input:\n{msg}")
# nlb.append(msg)
# encode to bytes
bmsg = str.encode(msg)
# relay bytes to subproc over pipe
await proc.stdin.send_all(bmsg)
# line = str.encode(''.join(nlb))
# print(line)
if bmsg in _pdb_exit_patterns:
log.info("Closing stdin hijack")
break
# finally:
# tty.tcsetattr(fd, tty.TCSAFLUSH, old)
# schedule hijacking in root scope
actor._root_nursery.start_soon(hijack_stdin)
# XXX: We only make this sync in case someone wants to
# overload the ``breakpoint()`` built-in.
def _breakpoint(debug_func) -> Awaitable[None]:
"""``tractor`` breakpoint entry for engaging pdb machinery
in subactors.
"""
actor = tractor.current_actor()
async def wait_for_parent_stdin_hijack():
log.debug('Breakpoint engaged!')
# TODO: need a more robust check for the "root" actor
if actor._parent_chan:
async with tractor._portal.open_portal(
actor._parent_chan,
start_msg_loop=False,
) as portal:
# with trio.fail_after(1):
await portal.run(
'tractor._debug',
'_hijack_stdin_relay_to_child',
subactor_uid=actor.uid,
)
# block here one frame up where ``breakpoint()``
# was awaited and begin handling stdin
debug_func(actor)
# this must be awaited by caller
return wait_for_parent_stdin_hijack()
def _set_trace(actor):
pdb.set_trace(
header=f"\nAttaching pdb to actor: {actor.uid}\n",
# start 2 levels up
frame=sys._getframe().f_back.f_back,
)
breakpoint = partial(
_breakpoint,
_set_trace,
)
def _post_mortem(actor):
log.error(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
pdb.post_mortem()
post_mortem = partial(
_breakpoint,
_post_mortem,
)

View File

@ -2,7 +2,8 @@
Process entry points. Process entry points.
""" """
from functools import partial from functools import partial
from typing import Tuple, Any from typing import Tuple, Any, Dict
import os
import trio # type: ignore import trio # type: ignore
@ -54,13 +55,17 @@ def _mp_main(
async def _trio_main( async def _trio_main(
actor: 'Actor', actor: 'Actor',
accept_addr: Tuple[str, int], accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
) -> None: ) -> None:
"""Entry point for a `trio_run_in_process` subactor. """Entry point for a `trio_run_in_process` subactor.
Here we don't need to call `trio.run()` since trip does that as Here we don't need to call `trio.run()` since trip does that as
part of its subprocess startup sequence. part of its subprocess startup sequence.
""" """
# TODO: make a global func to set this or is it too hacky?
# os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint'
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f"Setting loglevel for {actor.uid} to {actor.loglevel}")
@ -69,6 +74,7 @@ async def _trio_main(
log.info(f"Started new trio process for {actor.uid}") log.info(f"Started new trio process for {actor.uid}")
_state._current_actor = actor _state._current_actor = actor
_state._runtime_vars.update(_runtime_vars)
await actor._async_main(accept_addr, parent_addr=parent_addr) await actor._async_main(accept_addr, parent_addr=parent_addr)
log.info(f"Actor {actor.uid} terminated") log.info(f"Actor {actor.uid} terminated")

View File

@ -26,7 +26,7 @@ from multiprocessing import forkserver # type: ignore
from typing import Tuple from typing import Tuple
from . import _forkserver_override from . import _forkserver_override
from ._state import current_actor from ._state import current_actor, is_main_process
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._actor import Actor, ActorFailure from ._actor import Actor, ActorFailure
@ -90,12 +90,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
return _ctx return _ctx
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'
async def exhaust_portal( async def exhaust_portal(
portal: Portal, portal: Portal,
actor: Actor actor: Actor
@ -188,6 +182,8 @@ async def new_proc(
# passed through to actor main # passed through to actor main
bind_addr: Tuple[str, int], bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False, use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
@ -207,6 +203,7 @@ async def new_proc(
subactor, subactor,
bind_addr, bind_addr,
parent_addr, parent_addr,
_runtime_vars=_runtime_vars,
) as proc: ) as proc:
log.info(f"Started {proc}") log.info(f"Started {proc}")
@ -218,6 +215,10 @@ async def new_proc(
portal = Portal(chan) portal = Portal(chan)
actor_nursery._children[subactor.uid] = ( actor_nursery._children[subactor.uid] = (
subactor, proc, portal) subactor, proc, portal)
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
task_status.started(portal) task_status.started(portal)
# wait for ActorNursery.wait() to be called # wait for ActorNursery.wait() to be called

View File

@ -13,10 +13,11 @@ from ._state import current_actor
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor from ._actor import Actor
from ._portal import Portal from ._portal import Portal
from . import _state
from . import _spawn from . import _spawn
log = get_logger('tractor') log = get_logger(__name__)
class ActorNursery: class ActorNursery:
@ -56,6 +57,10 @@ class ActorNursery:
) -> Portal: ) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel() loglevel = loglevel or self._actor.loglevel or get_loglevel()
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
subactor = Actor( subactor = Actor(
name, name,
# modules allowed to invoked funcs from # modules allowed to invoked funcs from
@ -81,6 +86,7 @@ class ActorNursery:
self.errors, self.errors,
bind_addr, bind_addr,
parent_addr, parent_addr,
_rtv, # run time vars
) )
) )