diff --git a/tractor/_actor.py b/tractor/_actor.py index 81f05cd..1fd6dc3 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -7,6 +7,7 @@ from itertools import chain import importlib import importlib.util import inspect +import bdb import uuid import typing from typing import Dict, List, Tuple, Any, Optional @@ -125,8 +126,15 @@ async def _invoke( task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) except (Exception, trio.MultiError) as err: - # always ship errors back to caller log.exception("Actor errored:") + + # NOTE: don't enter debug mode recursively after quitting pdb + if _state.debug_mode() and not isinstance(err, bdb.BdbQuit): + # Allow for pdb control in parent + from ._debug import post_mortem + await post_mortem() + + # always ship errors back to caller err_msg = pack_error(err) err_msg['cid'] = cid try: @@ -182,6 +190,7 @@ class Actor: def __init__( self, name: str, + *, rpc_module_paths: List[str] = [], statespace: Optional[Dict[str, Any]] = None, uid: str = None, @@ -235,6 +244,7 @@ class Actor: self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ Tuple[Any, Any, Any, Any, Any]] = None + self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} async def wait_for_peer( self, uid: Tuple[str, str] @@ -709,11 +719,17 @@ class Actor: - cancelling all rpc tasks - cancelling the channel server - cancel the "root" nursery + - if root actor, cancel all nurseries under the root ``trio`` task """ # cancel all ongoing rpc tasks await self.cancel_rpc_tasks() self.cancel_server() self._root_nursery.cancel_scope.cancel() + if self._parent_chan is None: # TODO: more robust check + # we're the root actor or zombied + root = trio.lowlevel.current_root_task() + for n in root.child_nurseries: + n.cancel_scope.cancel() async def _cancel_task(self, cid, chan): """Cancel a local task by call-id / channel. diff --git a/tractor/_debug.py b/tractor/_debug.py new file mode 100644 index 0000000..e462b76 --- /dev/null +++ b/tractor/_debug.py @@ -0,0 +1,134 @@ +""" +Multi-core debugging for da peeps! +""" +import pdb +import sys +import tty +from functools import partial +from typing import Awaitable, Tuple + +from async_generator import aclosing +import tractor +import trio + +from .log import get_logger + +log = get_logger(__name__) + + +__all__ = ['breakpoint', 'post_mortem'] + + +# TODO: is there some way to determine this programatically? +_pdb_exit_patterns = tuple( + str.encode(patt + "\n") for patt in ('c', 'cont', 'continue', 'q', 'quit') +) + + +def subactoruid2proc( + actor: 'Actor', # noqa + uid: Tuple[str, str] +) -> trio.Process: + n = actor._actoruid2nursery[uid] + _, proc, _ = n._children[uid] + return proc + + +async def _hijack_stdin_relay_to_child( + subactor_uid: Tuple[str, str] +) -> None: + actor = tractor.current_actor() + proc = subactoruid2proc(actor, subactor_uid) + + # nlb = [] + + async def hijack_stdin(): + log.info(f"Hijacking stdin from {actor.uid}") + # try: + # # disable cooked mode + # fd = sys.stdin.fileno() + # old = tty.tcgetattr(fd) + # tty.setcbreak(fd) + + # trap std in and relay to subproc + async_stdin = trio.wrap_file(sys.stdin) + + async with aclosing(async_stdin): + # while True: + async for msg in async_stdin: + log.trace(f"Stdin input:\n{msg}") + # nlb.append(msg) + # encode to bytes + bmsg = str.encode(msg) + + # relay bytes to subproc over pipe + await proc.stdin.send_all(bmsg) + + # line = str.encode(''.join(nlb)) + # print(line) + + if bmsg in _pdb_exit_patterns: + log.info("Closing stdin hijack") + break + # finally: + # tty.tcsetattr(fd, tty.TCSAFLUSH, old) + + # schedule hijacking in root scope + actor._root_nursery.start_soon(hijack_stdin) + + +# XXX: We only make this sync in case someone wants to +# overload the ``breakpoint()`` built-in. +def _breakpoint(debug_func) -> Awaitable[None]: + """``tractor`` breakpoint entry for engaging pdb machinery + in subactors. + """ + actor = tractor.current_actor() + + async def wait_for_parent_stdin_hijack(): + log.debug('Breakpoint engaged!') + + # TODO: need a more robust check for the "root" actor + if actor._parent_chan: + async with tractor._portal.open_portal( + actor._parent_chan, + start_msg_loop=False, + ) as portal: + # with trio.fail_after(1): + await portal.run( + 'tractor._debug', + '_hijack_stdin_relay_to_child', + subactor_uid=actor.uid, + ) + + # block here one frame up where ``breakpoint()`` + # was awaited and begin handling stdin + debug_func(actor) + + # this must be awaited by caller + return wait_for_parent_stdin_hijack() + + +def _set_trace(actor): + pdb.set_trace( + header=f"\nAttaching pdb to actor: {actor.uid}\n", + # start 2 levels up + frame=sys._getframe().f_back.f_back, + ) + + +breakpoint = partial( + _breakpoint, + _set_trace, +) + + +def _post_mortem(actor): + log.error(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") + pdb.post_mortem() + + +post_mortem = partial( + _breakpoint, + _post_mortem, +) diff --git a/tractor/_entry.py b/tractor/_entry.py index 1c26065..0546e1d 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -2,7 +2,8 @@ Process entry points. """ from functools import partial -from typing import Tuple, Any +from typing import Tuple, Any, Dict +import os import trio # type: ignore @@ -54,13 +55,17 @@ def _mp_main( async def _trio_main( actor: 'Actor', accept_addr: Tuple[str, int], - parent_addr: Tuple[str, int] = None + parent_addr: Tuple[str, int], + _runtime_vars: Dict[str, Any], # serialized and sent to _child ) -> None: """Entry point for a `trio_run_in_process` subactor. Here we don't need to call `trio.run()` since trip does that as part of its subprocess startup sequence. """ + # TODO: make a global func to set this or is it too hacky? + # os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint' + if actor.loglevel is not None: log.info( f"Setting loglevel for {actor.uid} to {actor.loglevel}") @@ -69,6 +74,7 @@ async def _trio_main( log.info(f"Started new trio process for {actor.uid}") _state._current_actor = actor + _state._runtime_vars.update(_runtime_vars) await actor._async_main(accept_addr, parent_addr=parent_addr) log.info(f"Actor {actor.uid} terminated") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 8078c03..650a611 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -26,7 +26,7 @@ from multiprocessing import forkserver # type: ignore from typing import Tuple from . import _forkserver_override -from ._state import current_actor +from ._state import current_actor, is_main_process from .log import get_logger from ._portal import Portal from ._actor import Actor, ActorFailure @@ -90,12 +90,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: return _ctx -def is_main_process() -> bool: - """Bool determining if this actor is running in the top-most process. - """ - return mp.current_process().name == 'MainProcess' - - async def exhaust_portal( portal: Portal, actor: Actor @@ -188,6 +182,8 @@ async def new_proc( # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], + _runtime_vars: Dict[str, Any], # serialized and sent to _child + *, use_trio_run_in_process: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: @@ -207,6 +203,7 @@ async def new_proc( subactor, bind_addr, parent_addr, + _runtime_vars=_runtime_vars, ) as proc: log.info(f"Started {proc}") @@ -218,6 +215,10 @@ async def new_proc( portal = Portal(chan) actor_nursery._children[subactor.uid] = ( subactor, proc, portal) + + curr_actor = current_actor() + curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + task_status.started(portal) # wait for ActorNursery.wait() to be called diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 82d2653..b82176b 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -13,10 +13,11 @@ from ._state import current_actor from .log import get_logger, get_loglevel from ._actor import Actor from ._portal import Portal +from . import _state from . import _spawn -log = get_logger('tractor') +log = get_logger(__name__) class ActorNursery: @@ -56,6 +57,10 @@ class ActorNursery: ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() + # configure and pass runtime state + _rtv = _state._runtime_vars.copy() + _rtv['_is_root'] = False + subactor = Actor( name, # modules allowed to invoked funcs from @@ -81,6 +86,7 @@ class ActorNursery: self.errors, bind_addr, parent_addr, + _rtv, # run time vars ) )