forked from goodboy/tractor
commit
56297cf25c
|
@ -0,0 +1,27 @@
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
async def die():
|
||||||
|
raise RuntimeError
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery() as tn:
|
||||||
|
|
||||||
|
debug_actor = await tn.start_actor(
|
||||||
|
'debugged_boi',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
debug_mode=True,
|
||||||
|
)
|
||||||
|
crash_boi = await tn.start_actor(
|
||||||
|
'crash_boi',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
# debug_mode=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with trio.open_nursery() as n:
|
||||||
|
n.start_soon(debug_actor.run, die)
|
||||||
|
n.start_soon(crash_boi.run, die)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
trio.run(main)
|
|
@ -0,0 +1,9 @@
|
||||||
|
Add a per actor ``debug_mode: bool`` control to our nursery.
|
||||||
|
|
||||||
|
This allows spawning actors via ``ActorNursery.start_actor()`` (and
|
||||||
|
other dependent methods) with a ``debug_mode=True`` flag much like
|
||||||
|
``tractor.open_nursery():`` such that per process crash handling
|
||||||
|
can be toggled for cases where a user does not need/want all child actors
|
||||||
|
to drop into the debugger on error. This is often useful when you have
|
||||||
|
actor-tasks which are expected to error often (and be re-run) but want
|
||||||
|
to specifically interact with some (problematic) child.
|
|
@ -52,7 +52,7 @@ def repodir():
|
||||||
def pytest_addoption(parser):
|
def pytest_addoption(parser):
|
||||||
parser.addoption(
|
parser.addoption(
|
||||||
"--ll", action="store", dest='loglevel',
|
"--ll", action="store", dest='loglevel',
|
||||||
default=None, help="logging level to set when testing"
|
default='ERROR', help="logging level to set when testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
parser.addoption(
|
parser.addoption(
|
||||||
|
@ -75,6 +75,7 @@ def pytest_configure(config):
|
||||||
def loglevel(request):
|
def loglevel(request):
|
||||||
orig = tractor.log._default_loglevel
|
orig = tractor.log._default_loglevel
|
||||||
level = tractor.log._default_loglevel = request.config.option.loglevel
|
level = tractor.log._default_loglevel = request.config.option.loglevel
|
||||||
|
tractor.log.get_console_log(level)
|
||||||
yield level
|
yield level
|
||||||
tractor.log._default_loglevel = orig
|
tractor.log._default_loglevel = orig
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,11 @@ That native debug better work!
|
||||||
All these tests can be understood (somewhat) by running the equivalent
|
All these tests can be understood (somewhat) by running the equivalent
|
||||||
`examples/debugging/` scripts manually.
|
`examples/debugging/` scripts manually.
|
||||||
|
|
||||||
TODO: None of these tests have been run successfully on windows yet.
|
TODO:
|
||||||
|
- none of these tests have been run successfully on windows yet but
|
||||||
|
there's been manual testing that verified it works.
|
||||||
|
- wonder if any of it'll work on OS X?
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import time
|
import time
|
||||||
from os import path
|
from os import path
|
||||||
|
@ -279,8 +283,10 @@ def test_multi_subactors(spawn):
|
||||||
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
||||||
|
|
||||||
# wait for spawn error to show up
|
# wait for spawn error to show up
|
||||||
while 'breakpoint_forever' in before:
|
spawn_err = "Attaching to pdb in crashed actor: ('spawn_error'"
|
||||||
|
while spawn_err not in before:
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
|
time.sleep(0.1)
|
||||||
child.expect(r"\(Pdb\+\+\)")
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
|
||||||
|
@ -288,7 +294,7 @@ def test_multi_subactors(spawn):
|
||||||
# child.sendline('c')
|
# child.sendline('c')
|
||||||
# child.expect(r"\(Pdb\+\+\)")
|
# child.expect(r"\(Pdb\+\+\)")
|
||||||
# before = str(child.before.decode())
|
# before = str(child.before.decode())
|
||||||
assert "Attaching to pdb in crashed actor: ('spawn_error'" in before
|
assert spawn_err in before
|
||||||
assert "RemoteActorError: ('name_error_1'" in before
|
assert "RemoteActorError: ('name_error_1'" in before
|
||||||
|
|
||||||
# now run some "continues" to show re-entries
|
# now run some "continues" to show re-entries
|
||||||
|
@ -399,9 +405,11 @@ def test_multi_daemon_subactors(spawn, loglevel):
|
||||||
|
|
||||||
|
|
||||||
def test_multi_subactors_root_errors(spawn):
|
def test_multi_subactors_root_errors(spawn):
|
||||||
"""Multiple subactors, both erroring and breakpointing as well as
|
'''
|
||||||
|
Multiple subactors, both erroring and breakpointing as well as
|
||||||
a nested subactor erroring.
|
a nested subactor erroring.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
child = spawn('multi_subactor_root_errors')
|
child = spawn('multi_subactor_root_errors')
|
||||||
|
|
||||||
# scan for the pdbpp prompt
|
# scan for the pdbpp prompt
|
||||||
|
@ -559,3 +567,32 @@ def test_root_cancels_child_context_during_startup(
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
|
|
||||||
|
def test_different_debug_mode_per_actor(
|
||||||
|
spawn,
|
||||||
|
):
|
||||||
|
child = spawn('per_actor_debug')
|
||||||
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
|
# only one actor should enter the debugger
|
||||||
|
before = str(child.before.decode())
|
||||||
|
assert "Attaching to pdb in crashed actor: ('debugged_boi'" in before
|
||||||
|
assert "RuntimeError" in before
|
||||||
|
|
||||||
|
child.sendline('c')
|
||||||
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
|
before = str(child.before.decode())
|
||||||
|
|
||||||
|
# NOTE: this debugged actor error currently WON'T show up since the
|
||||||
|
# root will actually cancel and terminate the nursery before the error
|
||||||
|
# msg reported back from the debug mode actor is processed.
|
||||||
|
# assert "tractor._exceptions.RemoteActorError: ('debugged_boi'" in before
|
||||||
|
|
||||||
|
assert "tractor._exceptions.RemoteActorError: ('crash_boi'" in before
|
||||||
|
|
||||||
|
# the crash boi should not have made a debugger request but
|
||||||
|
# instead crashed completely
|
||||||
|
assert "tractor._exceptions.RemoteActorError: ('crash_boi'" in before
|
||||||
|
assert "RuntimeError" in before
|
||||||
|
|
|
@ -252,7 +252,7 @@ async def _hijack_stdin_for_child(
|
||||||
|
|
||||||
# indicate to child that we've locked stdio
|
# indicate to child that we've locked stdio
|
||||||
await ctx.started('Locked')
|
await ctx.started('Locked')
|
||||||
log.pdb(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
|
log.debug(f"Actor {subactor_uid} acquired stdin hijack lock")
|
||||||
|
|
||||||
# wait for unlock pdb by child
|
# wait for unlock pdb by child
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
@ -578,9 +578,11 @@ async def acquire_debug_lock(
|
||||||
async def maybe_wait_for_debugger(
|
async def maybe_wait_for_debugger(
|
||||||
poll_steps: int = 2,
|
poll_steps: int = 2,
|
||||||
poll_delay: float = 0.1,
|
poll_delay: float = 0.1,
|
||||||
|
child_in_debug: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
if not debug_mode():
|
if not debug_mode() and not child_in_debug:
|
||||||
return
|
return
|
||||||
|
|
||||||
if (
|
if (
|
||||||
|
@ -602,7 +604,7 @@ async def maybe_wait_for_debugger(
|
||||||
if _global_actor_in_debug:
|
if _global_actor_in_debug:
|
||||||
sub_in_debug = tuple(_global_actor_in_debug)
|
sub_in_debug = tuple(_global_actor_in_debug)
|
||||||
|
|
||||||
log.warning(
|
log.debug(
|
||||||
'Root polling for debug')
|
'Root polling for debug')
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
|
@ -619,7 +621,7 @@ async def maybe_wait_for_debugger(
|
||||||
(debug_complete and
|
(debug_complete and
|
||||||
not debug_complete.is_set())
|
not debug_complete.is_set())
|
||||||
):
|
):
|
||||||
log.warning(
|
log.debug(
|
||||||
'Root has errored but pdb is in use by '
|
'Root has errored but pdb is in use by '
|
||||||
f'child {sub_in_debug}\n'
|
f'child {sub_in_debug}\n'
|
||||||
'Waiting on tty lock to release..')
|
'Waiting on tty lock to release..')
|
||||||
|
@ -629,6 +631,6 @@ async def maybe_wait_for_debugger(
|
||||||
await trio.sleep(poll_delay)
|
await trio.sleep(poll_delay)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
log.warning(
|
log.debug(
|
||||||
'Root acquired TTY LOCK'
|
'Root acquired TTY LOCK'
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
"""
|
'''
|
||||||
Root actor runtime ignition(s).
|
Root actor runtime ignition(s).
|
||||||
"""
|
|
||||||
|
'''
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import importlib
|
import importlib
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
from typing import Tuple, Optional, List, Any
|
from typing import Tuple, Optional, List, Any
|
||||||
import typing
|
import typing
|
||||||
|
@ -80,6 +82,20 @@ async def open_root_actor(
|
||||||
if start_method is not None:
|
if start_method is not None:
|
||||||
_spawn.try_set_start_method(start_method)
|
_spawn.try_set_start_method(start_method)
|
||||||
|
|
||||||
|
arbiter_addr = (host, port) = arbiter_addr or (
|
||||||
|
_default_arbiter_host,
|
||||||
|
_default_arbiter_port,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if loglevel is None:
|
||||||
|
loglevel = log.get_loglevel()
|
||||||
|
else:
|
||||||
|
log._default_loglevel = loglevel
|
||||||
|
log.get_console_log(loglevel)
|
||||||
|
|
||||||
|
assert loglevel
|
||||||
|
|
||||||
if debug_mode and _spawn._spawn_method == 'trio':
|
if debug_mode and _spawn._spawn_method == 'trio':
|
||||||
_state._runtime_vars['_debug_mode'] = True
|
_state._runtime_vars['_debug_mode'] = True
|
||||||
|
|
||||||
|
@ -87,24 +103,22 @@ async def open_root_actor(
|
||||||
# for use of ``await tractor.breakpoint()``
|
# for use of ``await tractor.breakpoint()``
|
||||||
enable_modules.append('tractor._debug')
|
enable_modules.append('tractor._debug')
|
||||||
|
|
||||||
if loglevel is None:
|
# if debug mode get's enabled *at least* use that level of
|
||||||
loglevel = 'pdb'
|
# logging for some informative console prompts.
|
||||||
|
if (
|
||||||
|
logging.getLevelName(
|
||||||
|
# lul, need the upper case for the -> int map?
|
||||||
|
# sweet "dynamic function behaviour" stdlib...
|
||||||
|
loglevel.upper()
|
||||||
|
) > logging.getLevelName('PDB')
|
||||||
|
):
|
||||||
|
loglevel = 'PDB'
|
||||||
|
|
||||||
elif debug_mode:
|
elif debug_mode:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"Debug mode is only supported for the `trio` backend!"
|
"Debug mode is only supported for the `trio` backend!"
|
||||||
)
|
)
|
||||||
|
|
||||||
arbiter_addr = (host, port) = arbiter_addr or (
|
|
||||||
_default_arbiter_host,
|
|
||||||
_default_arbiter_port,
|
|
||||||
)
|
|
||||||
|
|
||||||
loglevel = loglevel or log.get_loglevel()
|
|
||||||
if loglevel is not None:
|
|
||||||
log._default_loglevel = loglevel
|
|
||||||
log.get_console_log(loglevel)
|
|
||||||
|
|
||||||
# make a temporary connection to see if an arbiter exists
|
# make a temporary connection to see if an arbiter exists
|
||||||
arbiter_found = False
|
arbiter_found = False
|
||||||
|
|
||||||
|
@ -238,18 +252,20 @@ def run(
|
||||||
|
|
||||||
|
|
||||||
def run_daemon(
|
def run_daemon(
|
||||||
rpc_module_paths: List[str],
|
enable_modules: list[str],
|
||||||
**kwargs
|
**kwargs
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Spawn daemon actor which will respond to RPC.
|
'''
|
||||||
|
Spawn daemon actor which will respond to RPC.
|
||||||
|
|
||||||
This is a convenience wrapper around
|
This is a convenience wrapper around
|
||||||
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
|
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
|
||||||
is meant to run forever responding to RPC requests.
|
is meant to run forever responding to RPC requests.
|
||||||
"""
|
|
||||||
kwargs['rpc_module_paths'] = list(rpc_module_paths)
|
|
||||||
|
|
||||||
for path in rpc_module_paths:
|
'''
|
||||||
|
kwargs['enable_modules'] = list(enable_modules)
|
||||||
|
|
||||||
|
for path in enable_modules:
|
||||||
importlib.import_module(path)
|
importlib.import_module(path)
|
||||||
|
|
||||||
return run(partial(trio.sleep, float('inf')), **kwargs)
|
return run(partial(trio.sleep, float('inf')), **kwargs)
|
||||||
|
|
|
@ -366,7 +366,9 @@ async def new_proc(
|
||||||
await proc.wait()
|
await proc.wait()
|
||||||
|
|
||||||
if is_root_process():
|
if is_root_process():
|
||||||
await maybe_wait_for_debugger()
|
await maybe_wait_for_debugger(
|
||||||
|
child_in_debug=_runtime_vars.get('_debug_mode', False),
|
||||||
|
)
|
||||||
|
|
||||||
if proc.poll() is None:
|
if proc.poll() is None:
|
||||||
log.cancel(f"Attempting to hard kill {proc}")
|
log.cancel(f"Attempting to hard kill {proc}")
|
||||||
|
|
|
@ -11,9 +11,8 @@ import warnings
|
||||||
import trio
|
import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
|
||||||
from . import _debug
|
|
||||||
from ._debug import maybe_wait_for_debugger
|
from ._debug import maybe_wait_for_debugger
|
||||||
from ._state import current_actor, is_main_process, is_root_process
|
from ._state import current_actor, is_main_process
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._actor import Actor
|
from ._actor import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
|
@ -51,6 +50,7 @@ class ActorNursery:
|
||||||
self._cancel_after_result_on_exit: set = set()
|
self._cancel_after_result_on_exit: set = set()
|
||||||
self.cancelled: bool = False
|
self.cancelled: bool = False
|
||||||
self._join_procs = trio.Event()
|
self._join_procs = trio.Event()
|
||||||
|
self._at_least_one_child_in_debug: bool = False
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
self.exited = trio.Event()
|
self.exited = trio.Event()
|
||||||
|
|
||||||
|
@ -63,13 +63,24 @@ class ActorNursery:
|
||||||
enable_modules: List[str] = None,
|
enable_modules: List[str] = None,
|
||||||
loglevel: str = None, # set log level per subactor
|
loglevel: str = None, # set log level per subactor
|
||||||
nursery: trio.Nursery = None,
|
nursery: trio.Nursery = None,
|
||||||
|
debug_mode: Optional[bool] = None,
|
||||||
) -> Portal:
|
) -> Portal:
|
||||||
|
'''
|
||||||
|
Start a (daemon) actor: an process that has no designated
|
||||||
|
"main task" besides the runtime.
|
||||||
|
|
||||||
|
'''
|
||||||
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
||||||
|
|
||||||
# configure and pass runtime state
|
# configure and pass runtime state
|
||||||
_rtv = _state._runtime_vars.copy()
|
_rtv = _state._runtime_vars.copy()
|
||||||
_rtv['_is_root'] = False
|
_rtv['_is_root'] = False
|
||||||
|
|
||||||
|
# allow setting debug policy per actor
|
||||||
|
if debug_mode is not None:
|
||||||
|
_rtv['_debug_mode'] = debug_mode
|
||||||
|
self._at_least_one_child_in_debug = True
|
||||||
|
|
||||||
enable_modules = enable_modules or []
|
enable_modules = enable_modules or []
|
||||||
|
|
||||||
if rpc_module_paths:
|
if rpc_module_paths:
|
||||||
|
@ -283,7 +294,9 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# will make the pdb repl unusable.
|
# will make the pdb repl unusable.
|
||||||
# Instead try to wait for pdb to be released before
|
# Instead try to wait for pdb to be released before
|
||||||
# tearing down.
|
# tearing down.
|
||||||
await maybe_wait_for_debugger()
|
await maybe_wait_for_debugger(
|
||||||
|
child_in_debug=anursery._at_least_one_child_in_debug
|
||||||
|
)
|
||||||
|
|
||||||
# if the caller's scope errored then we activate our
|
# if the caller's scope errored then we activate our
|
||||||
# one-cancels-all supervisor strategy (don't
|
# one-cancels-all supervisor strategy (don't
|
||||||
|
@ -337,6 +350,12 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
|
|
||||||
) as err:
|
) as err:
|
||||||
|
|
||||||
|
# XXX: yet another guard before allowing the cancel
|
||||||
|
# sequence in case a (single) child is in debug.
|
||||||
|
await maybe_wait_for_debugger(
|
||||||
|
child_in_debug=anursery._at_least_one_child_in_debug
|
||||||
|
)
|
||||||
|
|
||||||
# If actor-local error was raised while waiting on
|
# If actor-local error was raised while waiting on
|
||||||
# ".run_in_actor()" actors then we also want to cancel all
|
# ".run_in_actor()" actors then we also want to cancel all
|
||||||
# remaining sub-actors (due to our lone strategy:
|
# remaining sub-actors (due to our lone strategy:
|
||||||
|
|
|
@ -9,8 +9,8 @@ from typing import Optional
|
||||||
from ._state import ActorContextInfo
|
from ._state import ActorContextInfo
|
||||||
|
|
||||||
|
|
||||||
_proj_name = 'tractor'
|
_proj_name: str = 'tractor'
|
||||||
_default_loglevel = 'ERROR'
|
_default_loglevel: str = 'ERROR'
|
||||||
|
|
||||||
# Super sexy formatting thanks to ``colorlog``.
|
# Super sexy formatting thanks to ``colorlog``.
|
||||||
# (NOTE: we use the '{' format style)
|
# (NOTE: we use the '{' format style)
|
||||||
|
@ -189,5 +189,5 @@ def get_console_log(
|
||||||
return log
|
return log
|
||||||
|
|
||||||
|
|
||||||
def get_loglevel() -> Optional[str]:
|
def get_loglevel() -> str:
|
||||||
return _default_loglevel
|
return _default_loglevel
|
||||||
|
|
Loading…
Reference in New Issue