diff --git a/tractor/_actor.py b/tractor/_actor.py index d59bd82..9053d6f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -7,6 +7,7 @@ from itertools import chain import importlib import importlib.util import inspect +import bdb import uuid import typing from typing import Dict, List, Tuple, Any, Optional @@ -125,8 +126,15 @@ async def _invoke( task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) except (Exception, trio.MultiError) as err: - # always ship errors back to caller log.exception("Actor errored:") + + # NOTE: don't enter debug mode recursively after quitting pdb + if _state.debug_mode() and not isinstance(err, bdb.BdbQuit): + # Allow for pdb control in parent + from ._debug import post_mortem + await post_mortem() + + # always ship errors back to caller err_msg = pack_error(err) err_msg['cid'] = cid try: @@ -178,6 +186,7 @@ class Actor: def __init__( self, name: str, + *, rpc_module_paths: List[str] = [], statespace: Optional[Dict[str, Any]] = None, uid: str = None, @@ -237,6 +246,7 @@ class Actor: self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ Tuple[Any, Any, Any, Any, Any]] = None + self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} async def wait_for_peer( self, uid: Tuple[str, str] diff --git a/tractor/_debug.py b/tractor/_debug.py new file mode 100644 index 0000000..e462b76 --- /dev/null +++ b/tractor/_debug.py @@ -0,0 +1,134 @@ +""" +Multi-core debugging for da peeps! +""" +import pdb +import sys +import tty +from functools import partial +from typing import Awaitable, Tuple + +from async_generator import aclosing +import tractor +import trio + +from .log import get_logger + +log = get_logger(__name__) + + +__all__ = ['breakpoint', 'post_mortem'] + + +# TODO: is there some way to determine this programatically? +_pdb_exit_patterns = tuple( + str.encode(patt + "\n") for patt in ('c', 'cont', 'continue', 'q', 'quit') +) + + +def subactoruid2proc( + actor: 'Actor', # noqa + uid: Tuple[str, str] +) -> trio.Process: + n = actor._actoruid2nursery[uid] + _, proc, _ = n._children[uid] + return proc + + +async def _hijack_stdin_relay_to_child( + subactor_uid: Tuple[str, str] +) -> None: + actor = tractor.current_actor() + proc = subactoruid2proc(actor, subactor_uid) + + # nlb = [] + + async def hijack_stdin(): + log.info(f"Hijacking stdin from {actor.uid}") + # try: + # # disable cooked mode + # fd = sys.stdin.fileno() + # old = tty.tcgetattr(fd) + # tty.setcbreak(fd) + + # trap std in and relay to subproc + async_stdin = trio.wrap_file(sys.stdin) + + async with aclosing(async_stdin): + # while True: + async for msg in async_stdin: + log.trace(f"Stdin input:\n{msg}") + # nlb.append(msg) + # encode to bytes + bmsg = str.encode(msg) + + # relay bytes to subproc over pipe + await proc.stdin.send_all(bmsg) + + # line = str.encode(''.join(nlb)) + # print(line) + + if bmsg in _pdb_exit_patterns: + log.info("Closing stdin hijack") + break + # finally: + # tty.tcsetattr(fd, tty.TCSAFLUSH, old) + + # schedule hijacking in root scope + actor._root_nursery.start_soon(hijack_stdin) + + +# XXX: We only make this sync in case someone wants to +# overload the ``breakpoint()`` built-in. +def _breakpoint(debug_func) -> Awaitable[None]: + """``tractor`` breakpoint entry for engaging pdb machinery + in subactors. + """ + actor = tractor.current_actor() + + async def wait_for_parent_stdin_hijack(): + log.debug('Breakpoint engaged!') + + # TODO: need a more robust check for the "root" actor + if actor._parent_chan: + async with tractor._portal.open_portal( + actor._parent_chan, + start_msg_loop=False, + ) as portal: + # with trio.fail_after(1): + await portal.run( + 'tractor._debug', + '_hijack_stdin_relay_to_child', + subactor_uid=actor.uid, + ) + + # block here one frame up where ``breakpoint()`` + # was awaited and begin handling stdin + debug_func(actor) + + # this must be awaited by caller + return wait_for_parent_stdin_hijack() + + +def _set_trace(actor): + pdb.set_trace( + header=f"\nAttaching pdb to actor: {actor.uid}\n", + # start 2 levels up + frame=sys._getframe().f_back.f_back, + ) + + +breakpoint = partial( + _breakpoint, + _set_trace, +) + + +def _post_mortem(actor): + log.error(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") + pdb.post_mortem() + + +post_mortem = partial( + _breakpoint, + _post_mortem, +) diff --git a/tractor/_entry.py b/tractor/_entry.py index 883cc6b..71dfdec 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -57,6 +57,9 @@ def _trio_main( ) -> None: """Entry point for a `trio_run_in_process` subactor. """ + # TODO: make a global func to set this or is it too hacky? + # os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint' + if actor.loglevel is not None: log.info( f"Setting loglevel for {actor.uid} to {actor.loglevel}") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index f0eb012..6d19614 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -23,7 +23,7 @@ from multiprocessing import forkserver # type: ignore from typing import Tuple from . import _forkserver_override -from ._state import current_actor +from ._state import current_actor, is_main_process from .log import get_logger from ._portal import Portal from ._actor import Actor, ActorFailure @@ -87,12 +87,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: return _ctx -def is_main_process() -> bool: - """Bool determining if this actor is running in the top-most process. - """ - return mp.current_process().name == 'MainProcess' - - async def exhaust_portal( portal: Portal, actor: Actor @@ -206,6 +200,8 @@ async def new_proc( # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], + _runtime_vars: Dict[str, Any], # serialized and sent to _child + *, use_trio_run_in_process: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: @@ -241,9 +237,14 @@ async def new_proc( "statespace": subactor.statespace, "_arb_addr": subactor._arb_addr, "bind_host": bind_addr[0], - "bind_port": bind_addr[1] + "bind_port": bind_addr[1], + "_runtime_vars": _runtime_vars, }) + # track subactor in current nursery + curr_actor = current_actor() + curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + # resume caller at next checkpoint now that child is up task_status.started(portal) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index e846144..d51a59b 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -13,10 +13,11 @@ from ._state import current_actor from .log import get_logger, get_loglevel from ._actor import Actor from ._portal import Portal +from . import _state from . import _spawn -log = get_logger('tractor') +log = get_logger(__name__) _default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0) @@ -58,6 +59,10 @@ class ActorNursery: ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() + # configure and pass runtime state + _rtv = _state._runtime_vars.copy() + _rtv['_is_root'] = False + subactor = Actor( name, # modules allowed to invoked funcs from @@ -83,6 +88,7 @@ class ActorNursery: self.errors, bind_addr, parent_addr, + _rtv, # run time vars ) )