forked from goodboy/tractor
1
0
Fork 0

Compare commits

...

9 Commits

Author SHA1 Message Date
Tyler Goodlet eebd9bf05c Flip to trace logging 2020-07-27 09:03:41 -04:00
Tyler Goodlet 7886a9fa64 Try ipdb to get tab-complete without a tty
It doesn't work but in theory since ipython uses python-prompt-toolkit
this may be possible (and is really the best solution over trying to
hack ttys/ptys in the child). If `ipdb` is installed try using it.
2020-07-27 09:03:41 -04:00
Tyler Goodlet 042f2326db Play around with relaying each char to child 2020-07-27 09:03:41 -04:00
Tyler Goodlet 7b5af3b2d4 Pass a copy of the expected exposed modules 2020-07-27 00:06:06 -04:00
Tyler Goodlet bcbef1a095 WIP debugging test script 2020-07-27 00:06:06 -04:00
Tyler Goodlet b1897da328 Add support for "debug mode"
When enabled a crashed actor will connect to the parent with `pdb`
in post mortem mode.
2020-07-27 00:06:06 -04:00
Tyler Goodlet 23fee8820b Initial attempt at multi-actor debugging
Allow entering and attaching to a `pdb` instance in a child process.
The current hackery is to have the child make an rpc to the parent and
ask it to hijack stdin, once complete the child enters a `pdb` blocking
method. The parent then relays all stdin input to the child thus
controlling the "remote" debugger.

A few things were added to accomplish this:
- tracking the mapping of subactors to their parent nurseries
- in the root actor, cancelling all nurseries under the root `trio` task
  on cancellation (i.e. `Actor.cancel()`)
- pass a "runtime vars" map down the actor tree for propagating global state
2020-07-27 00:06:06 -04:00
Tyler Goodlet 96cf4b6e9f Allow opening a portal through an existing channel 2020-07-27 00:06:06 -04:00
Tyler Goodlet 90499c19d7 Create runtime variables 2020-07-27 00:06:06 -04:00
10 changed files with 273 additions and 23 deletions

View File

@ -0,0 +1,28 @@
import tractor
import trio
async def bubble():
print('IN BUBBLE')
await trio.sleep(.1)
await tractor.breakpoint()
async def bail():
getattr(doggy)
async def main():
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery() as n:
portal = await n.run_in_actor('future_self', bubble)
# portal = await n.run_in_actor('future_self', bail)
# The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``.
if __name__ == '__main__':
tractor.run(main, loglevel='trace', debug_mode=True)

View File

@ -63,7 +63,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
# module should raise a ModuleNotFoundError at import # module should raise a ModuleNotFoundError at import
testdir.makefile('.py', tmp_mod=funcname) testdir.makefile('.py', tmp_mod=funcname)
# no need to exposed module to the subactor # no need to expose module to the subactor
subactor_exposed_mods = exposed_mods subactor_exposed_mods = exposed_mods
exposed_mods = [] exposed_mods = []
func_defined = False func_defined = False
@ -95,7 +95,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
tractor.run( tractor.run(
main, main,
arbiter_addr=arb_addr, arbiter_addr=arb_addr,
rpc_module_paths=exposed_mods, rpc_module_paths=exposed_mods.copy(),
) )
# handle both parameterized cases # handle both parameterized cases

View File

@ -18,6 +18,7 @@ from ._actor import Actor, _start_actor, Arbiter
from ._trionics import open_nursery from ._trionics import open_nursery
from ._state import current_actor from ._state import current_actor
from ._exceptions import RemoteActorError, ModuleNotExposed from ._exceptions import RemoteActorError, ModuleNotExposed
from ._debug import breakpoint, post_mortem
from . import msg from . import msg
from . import _spawn from . import _spawn
@ -103,14 +104,26 @@ def run(
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio_run_in_process` (the new default). # OR `trio_run_in_process` (the new default).
start_method: Optional[str] = None, start_method: Optional[str] = None,
debug_mode: bool = False,
**kwargs, **kwargs,
) -> Any: ) -> Any:
"""Run a trio-actor async function in process. """Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor. This is tractor's main entry and the start point for any async actor.
""" """
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True
if start_method is not None: if start_method is not None:
_spawn.try_set_start_method(start_method) _spawn.try_set_start_method(start_method)
if debug_mode:
_state._runtime_vars['_debug_mode'] = True
# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
kwargs.setdefault('rpc_module_paths', []).append('tractor._debug')
return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name) return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name)

View File

@ -7,6 +7,7 @@ from itertools import chain
import importlib import importlib
import importlib.util import importlib.util
import inspect import inspect
import bdb
import uuid import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional from typing import Dict, List, Tuple, Any, Optional
@ -125,8 +126,15 @@ async def _invoke(
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# always ship errors back to caller
log.exception("Actor errored:") log.exception("Actor errored:")
# NOTE: don't enter debug mode recursively after quitting pdb
if _state.debug_mode() and not isinstance(err, bdb.BdbQuit):
# Allow for pdb control in parent
from ._debug import post_mortem
await post_mortem()
# always ship errors back to caller
err_msg = pack_error(err) err_msg = pack_error(err)
err_msg['cid'] = cid err_msg['cid'] = cid
try: try:
@ -182,6 +190,7 @@ class Actor:
def __init__( def __init__(
self, self,
name: str, name: str,
*,
rpc_module_paths: List[str] = [], rpc_module_paths: List[str] = [],
statespace: Optional[Dict[str, Any]] = None, statespace: Optional[Dict[str, Any]] = None,
uid: str = None, uid: str = None,
@ -235,6 +244,7 @@ class Actor:
self._parent_chan: Optional[Channel] = None self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[ self._forkserver_info: Optional[
Tuple[Any, Any, Any, Any, Any]] = None Tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {}
async def wait_for_peer( async def wait_for_peer(
self, uid: Tuple[str, str] self, uid: Tuple[str, str]
@ -709,11 +719,17 @@ class Actor:
- cancelling all rpc tasks - cancelling all rpc tasks
- cancelling the channel server - cancelling the channel server
- cancel the "root" nursery - cancel the "root" nursery
- if root actor, cancel all nurseries under the root ``trio`` task
""" """
# cancel all ongoing rpc tasks # cancel all ongoing rpc tasks
await self.cancel_rpc_tasks() await self.cancel_rpc_tasks()
self.cancel_server() self.cancel_server()
self._root_nursery.cancel_scope.cancel() self._root_nursery.cancel_scope.cancel()
if self._parent_chan is None: # TODO: more robust check
# we're the root actor or zombied
root = trio.lowlevel.current_root_task()
for n in root.child_nurseries:
n.cancel_scope.cancel()
async def _cancel_task(self, cid, chan): async def _cancel_task(self, cid, chan):
"""Cancel a local task by call-id / channel. """Cancel a local task by call-id / channel.

154
tractor/_debug.py 100644
View File

@ -0,0 +1,154 @@
"""
Multi-core debugging for da peeps!
"""
import pdb
import sys
import tty
import termios
from functools import partial
from typing import Awaitable, Tuple
from async_generator import aclosing
import tractor
import trio
from .log import get_logger
log = get_logger(__name__)
__all__ = ['breakpoint', 'post_mortem']
# TODO: is there some way to determine this programatically?
_pdb_exit_patterns = tuple(
str.encode(patt + "\n") for patt in ('c', 'cont', 'continue', 'q', 'quit')
)
def subactoruid2proc(
actor: 'Actor', # noqa
uid: Tuple[str, str]
) -> trio.Process:
n = actor._actoruid2nursery[uid]
_, proc, _ = n._children[uid]
return proc
async def _hijack_stdin_relay_to_child(
subactor_uid: Tuple[str, str]
) -> None:
actor = tractor.current_actor()
proc = subactoruid2proc(actor, subactor_uid)
nlb = []
line = []
async def hijack_stdin():
log.info(f"Hijacking stdin from {actor.uid}")
try:
# disable cooked mode
fd = sys.stdin.fileno()
old = tty.tcgetattr(fd)
# tty.setcbreak(fd)
tty.setcbreak(fd)
# trap std in and relay to subproc
async_stdin = trio.wrap_file(sys.stdin)
async with aclosing(async_stdin):
while True:
# async for msg in async_stdin:
msg = await async_stdin.read(1)
nlb.append(msg)
# encode to bytes
bmsg = str.encode(msg)
log.trace(f"Stdin str input:\n{msg}")
log.trace(f"Stdin bytes input:\n{bmsg}")
# relay bytes to subproc over pipe
await proc.stdin.send_all(bmsg)
# termios.tcflush(fd, termios.TCIFLUSH)
# don't write control chars to local stdout
if bmsg not in (b'\t'):
# mirror input to stdout
sys.stdout.write(msg)
sys.stdout.flush()
if bmsg == b'\n':
line = str.encode(''.join(nlb))
# print(line.decode())
if line in _pdb_exit_patterns:
log.info("Closing stdin hijack")
line = []
break
finally:
tty.tcsetattr(fd, tty.TCSAFLUSH, old)
# schedule hijacking in root scope
actor._root_nursery.start_soon(hijack_stdin)
# XXX: We only make this sync in case someone wants to
# overload the ``breakpoint()`` built-in.
def _breakpoint(debug_func) -> Awaitable[None]:
"""``tractor`` breakpoint entry for engaging pdb machinery
in subactors.
"""
actor = tractor.current_actor()
try:
import ipdb
db = ipdb
except ImportError:
import pdb
db = pdb
async def wait_for_parent_stdin_hijack():
log.debug('Breakpoint engaged!')
# TODO: need a more robust check for the "root" actor
if actor._parent_chan:
async with tractor._portal.open_portal(
actor._parent_chan,
start_msg_loop=False,
) as portal:
# with trio.fail_after(1):
await portal.run(
'tractor._debug',
'_hijack_stdin_relay_to_child',
subactor_uid=actor.uid,
)
# block here one frame up where ``breakpoint()``
# was awaited and begin handling stdin
debug_func(actor, db)
# this must be awaited by caller
return wait_for_parent_stdin_hijack()
def _set_trace(actor, dbmod):
dbmod.set_trace(
header=f"\nAttaching pdb to actor: {actor.uid}\n",
# start 2 levels up
frame=sys._getframe().f_back.f_back,
)
breakpoint = partial(
_breakpoint,
_set_trace,
)
def _post_mortem(actor, dbmod):
log.error(
f"\nAttaching to {dbmod} in crashed actor: {actor.uid}\n")
dbmod.post_mortem()
post_mortem = partial(
_breakpoint,
_post_mortem,
)

View File

@ -2,7 +2,8 @@
Process entry points. Process entry points.
""" """
from functools import partial from functools import partial
from typing import Tuple, Any from typing import Tuple, Any, Dict
import os
import trio # type: ignore import trio # type: ignore
@ -54,13 +55,17 @@ def _mp_main(
async def _trio_main( async def _trio_main(
actor: 'Actor', actor: 'Actor',
accept_addr: Tuple[str, int], accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
) -> None: ) -> None:
"""Entry point for a `trio_run_in_process` subactor. """Entry point for a `trio_run_in_process` subactor.
Here we don't need to call `trio.run()` since trip does that as Here we don't need to call `trio.run()` since trip does that as
part of its subprocess startup sequence. part of its subprocess startup sequence.
""" """
# TODO: make a global func to set this or is it too hacky?
# os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint'
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f"Setting loglevel for {actor.uid} to {actor.loglevel}")
@ -69,6 +74,7 @@ async def _trio_main(
log.info(f"Started new trio process for {actor.uid}") log.info(f"Started new trio process for {actor.uid}")
_state._current_actor = actor _state._current_actor = actor
_state._runtime_vars.update(_runtime_vars)
await actor._async_main(accept_addr, parent_addr=parent_addr) await actor._async_main(accept_addr, parent_addr=parent_addr)
log.info(f"Actor {actor.uid} terminated") log.info(f"Actor {actor.uid} terminated")

View File

@ -314,7 +314,8 @@ class LocalPortal:
@asynccontextmanager @asynccontextmanager
async def open_portal( async def open_portal(
channel: Channel, channel: Channel,
nursery: Optional[trio.Nursery] = None nursery: Optional[trio.Nursery] = None,
start_msg_loop: bool = True,
) -> typing.AsyncGenerator[Portal, None]: ) -> typing.AsyncGenerator[Portal, None]:
"""Open a ``Portal`` through the provided ``channel``. """Open a ``Portal`` through the provided ``channel``.
@ -332,6 +333,8 @@ async def open_portal(
if channel.uid is None: if channel.uid is None:
await actor._do_handshake(channel) await actor._do_handshake(channel)
msg_loop_cs = None
if start_msg_loop:
msg_loop_cs: trio.CancelScope = await nursery.start( msg_loop_cs: trio.CancelScope = await nursery.start(
partial( partial(
actor._process_messages, actor._process_messages,
@ -352,6 +355,7 @@ async def open_portal(
await channel.send(None) await channel.send(None)
# cancel background msg loop task # cancel background msg loop task
if msg_loop_cs:
msg_loop_cs.cancel() msg_loop_cs.cancel()
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()

View File

@ -26,7 +26,7 @@ from multiprocessing import forkserver # type: ignore
from typing import Tuple from typing import Tuple
from . import _forkserver_override from . import _forkserver_override
from ._state import current_actor from ._state import current_actor, is_main_process
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._actor import Actor, ActorFailure from ._actor import Actor, ActorFailure
@ -90,12 +90,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
return _ctx return _ctx
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'
async def exhaust_portal( async def exhaust_portal(
portal: Portal, portal: Portal,
actor: Actor actor: Actor
@ -188,6 +182,8 @@ async def new_proc(
# passed through to actor main # passed through to actor main
bind_addr: Tuple[str, int], bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False, use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
@ -207,6 +203,7 @@ async def new_proc(
subactor, subactor,
bind_addr, bind_addr,
parent_addr, parent_addr,
_runtime_vars=_runtime_vars,
) as proc: ) as proc:
log.info(f"Started {proc}") log.info(f"Started {proc}")
@ -218,6 +215,10 @@ async def new_proc(
portal = Portal(chan) portal = Portal(chan)
actor_nursery._children[subactor.uid] = ( actor_nursery._children[subactor.uid] = (
subactor, proc, portal) subactor, proc, portal)
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
task_status.started(portal) task_status.started(portal)
# wait for ActorNursery.wait() to be called # wait for ActorNursery.wait() to be called

View File

@ -3,10 +3,15 @@ Per process state
""" """
from typing import Optional from typing import Optional
from collections import Mapping from collections import Mapping
import multiprocessing as mp
import trio import trio
_current_actor: Optional['Actor'] = None # type: ignore _current_actor: Optional['Actor'] = None # type: ignore
_runtime_vars = {
'_debug_mode': False,
'_is_root': False,
}
def current_actor() -> 'Actor': # type: ignore def current_actor() -> 'Actor': # type: ignore
@ -36,3 +41,20 @@ class ActorContextInfo(Mapping):
except RuntimeError: except RuntimeError:
# no local actor/task context initialized yet # no local actor/task context initialized yet
return f'no {key} context' return f'no {key} context'
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'
def debug_mode() -> bool:
"""Bool determining if "debug mode" is on which enables
remote subactor pdb entry on crashes.
"""
return _runtime_vars['_debug_mode']
def is_root_process() -> bool:
return _runtime_vars['_is_root']

View File

@ -13,10 +13,11 @@ from ._state import current_actor
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor from ._actor import Actor
from ._portal import Portal from ._portal import Portal
from . import _state
from . import _spawn from . import _spawn
log = get_logger('tractor') log = get_logger(__name__)
class ActorNursery: class ActorNursery:
@ -56,6 +57,10 @@ class ActorNursery:
) -> Portal: ) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel() loglevel = loglevel or self._actor.loglevel or get_loglevel()
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
subactor = Actor( subactor = Actor(
name, name,
# modules allowed to invoked funcs from # modules allowed to invoked funcs from
@ -81,6 +86,7 @@ class ActorNursery:
self.errors, self.errors,
bind_addr, bind_addr,
parent_addr, parent_addr,
_rtv, # run time vars
) )
) )