Compare commits

..

37 Commits

Author SHA1 Message Date
Tyler Goodlet 4d675deb24 `_root`: drop unused `typing` import 2025-03-18 12:23:40 -04:00
Tyler Goodlet 31297171fc Use `import <name> as <name>,` style over `__all__` in pkg mod 2025-03-18 12:23:39 -04:00
Tyler Goodlet d7a9ddd4a9 Log chan-server-startup failures via `.exception()` 2025-03-18 12:22:47 -04:00
Tyler Goodlet 43b84c99b6 `.discovery.get_arbiter()`: add warning around this now deprecated usage 2025-03-18 12:22:47 -04:00
Tyler Goodlet b7f2258f15 Add `open_root_actor(ensure_registry: bool)`
Allows forcing the opened actor to either obtain the passed registry
addrs or raise a runtime error.
2025-03-18 12:22:47 -04:00
Tyler Goodlet 1dfa109879 Fix doc string "its" typo.. 2025-03-18 12:22:47 -04:00
Tyler Goodlet 0453e3565e Test with `any(portals)` since `gather_contexts()` will return `list[None | tuple]` 2025-03-18 12:22:47 -04:00
Tyler Goodlet 0bf13c50b4 Change remaining internals to use `Actor.reg_addrs` 2025-03-18 12:22:25 -04:00
Tyler Goodlet 2e69aa0f67 Expose per-actor registry addrs via `.reg_addrs`
Since it's handy to be able to debug the *writing* of this instance var
(particularly when checking state passed down to a child in
`Actor._from_parent()`), rename and wrap the underlying
`Actor._reg_addrs` as a settable `@property` and add validation to
the `.setter` for sanity - actor discovery is a critical functionality.

Other tweaks:
- fix `.cancel_soon()` to pass expected argument..
- update internal runtime error message to be simpler and link to GH issues.
- use new `Actor.reg_addrs` throughout core.
2025-03-18 12:22:25 -04:00
Tyler Goodlet fc9c7e6e3f Get remaining suites passing..
..by ensuring `reg_addr` fixture value passthrough to subactor eps
2025-03-18 12:22:25 -04:00
Tyler Goodlet 354a4c2226 Always dynamically re-read the `._root._default_lo_addrs` value in `find_actor()` 2025-03-18 12:22:25 -04:00
Tyler Goodlet d2c88e9709 Ensure `registry_addrs` is always set to something 2025-03-18 12:22:25 -04:00
Tyler Goodlet ef179b69f2 Rename fixture `arb_addr` -> `reg_addr` and set the session value globally as `._root._default_lo_addrs` 2025-03-18 12:22:25 -04:00
Tyler Goodlet 837602a011 Facepalm, `wait_for_actor()` dun take an addr `list`.. 2025-03-18 12:22:25 -04:00
Tyler Goodlet debacef30e ._root: set a `_default_lo_addrs` and apply it when not provided by caller 2025-03-18 12:21:47 -04:00
Tyler Goodlet 7ae405ef5a Always set default reg addr in `find_actor()` if not defined 2025-03-18 12:21:47 -04:00
Tyler Goodlet e428bf0a34 Oof, default reg addrs needs to be in `list[tuple]` form.. 2025-03-18 12:21:47 -04:00
Tyler Goodlet d448bb81bd Add post-mortem catch around failed transport addr binds to aid with runtime debugging 2025-03-18 12:21:47 -04:00
Tyler Goodlet 77108a9759 Rename to `parse_maddr()` and fill out doc strings 2025-03-18 12:21:47 -04:00
Tyler Goodlet 1720fefa1d Add libp2p style "multi-address" parser from `piker`
Details are in the module docs; this is a first draft with lotsa room
for refinement and extension.
2025-03-18 12:21:47 -04:00
Tyler Goodlet a3c1f8e419 Init-support for "multi homed" transports
Since we'd like to eventually allow a diverse set of transport
(protocol) methods and stacks, and a multi-peer discovery system for
distributed actor-tree applications, this reworks all runtime internals
to support multi-homing for any given tree on a logical host. In other
words any actor can now bind its transport server (currently only
unsecured TCP + `msgspec`) to more then one address available in its
(linux) network namespace. Further, registry actors (now dubbed
"registars" instead of "arbiters") can also similarly bind to multiple
network addresses and provide discovery services to remote actors via
multiple addresses which can now be provided at runtime startup.

Deats:
- adjust `._runtime` internals to use a `list[tuple[str, int]]` (and
  thus pluralized) socket address sequence where applicable for transport
  server socket binds, now exposed via `Actor.accept_addrs`:
  - `Actor.__init__()` now takes a `registry_addrs: list`.
  - `Actor.is_arbiter` -> `.is_registrar`.
  - `._arb_addr` -> `._reg_addrs: list[tuple]`.
  - always reg and de-reg from all registrars in `async_main()`.
  - only set the global runtime var `'_root_mailbox'` to the loopback
    address since normally all in-tree processes should have access to
    it, right?
  - `._serve_forever()` task now takes `listen_sockaddrs: list[tuple]`
- make `open_root_actor()` take a `registry_addrs: list[tuple[str, int]]`
  and defaults when not passed.
- change `ActorNursery.start_..()` methods take `bind_addrs: list` and
  pass down through the spawning layer(s) via the parent-seed-msg.
- generalize all `._discovery()` APIs to accept `registry_addrs`-like
  inputs and move all relevant subsystems to adopt the "registry" style
  naming instead of "arbiter":
  - make `find_actor()` support batched concurrent portal queries over
    all provided input addresses using `.trionics.gather_contexts()` Bo
  - syntax: move to using `async with <tuples>` 3.9+ style chained
    @acms.
  - a general modernization of the code to a python 3.9+ style.
  - start deprecation and change to "registry" naming / semantics:
    - `._discovery.get_arbiter()` -> `.get_registry()`
2025-03-18 12:21:45 -04:00
Tyler Goodlet db58f6e1b5 Woops, fix `_post_mortem()` type sig..
We're passing a `extra_frames_up_when_async=2` now (from prior attempt
to hide `CancelScope.__exit__()` when `shield=True`) and thus both
`debug_func`s must accept it 🤦

On the brighter side found out that the `TypeError` from the call-sig
mismatch was actually being swallowed entirely so add some
`.exception()` msgs for such cases to at least alert the dev they broke
stuff XD
2025-03-16 23:24:52 -04:00
Tyler Goodlet 76b7006977 Add `shield: bool` support to `.pause()`
It's been on the todo for a while and I've given up trying to properly
hide the `trio.CancelScope.__exit__()` frame for now instead opting to
just `log.pdb()` a big apology XD

Users can obvi still just not use the flag and wrap `tractor.pause()` in
their own cs block if they want to avoid having to hit `'up'` in the pdb
REPL if needed in a cancelled task-scope.

Impl deatz:
- factor orig `.pause()` impl into new `._pause()` so that we can more tersely
  wrap the original content depending on `shield: bool` input; only open
  the cancel-scope when shield is set to avoid aforemented extra strack
  frame annoyance.
- pass through `shield` to underlying `_pause` and `debug_func()` so we
  can actually know when so log our apology.
- add a buncha notes to new `.pause()` wrapper regarding the inability
  to hide the cancel-scope `.__exit__()`, inluding that overriding the
  code in `trio._core._run.CancelScope` doesn't seem to solve the issue
  either..

Unrelated `maybe_wait_for_debugger()` tweaks:
- don't read `Lock.global_actor_in_debug` more then needed, rename local
  read var to `in_debug` (since it can also hold the root actor uid, not
  just sub-actors).
- shield the `await debug_complete.wait()` since ideally we avoid the
  root cancellation child-actors in debug even when the root calls this
  func in a cancelled scope.
2025-03-16 23:24:52 -04:00
Tyler Goodlet bd1885bce1 Mk debugger tests work for arbitrary pre-REPL format
Since this was changed as part of overall project wide logging format
updates, and i ended up changing the both the crash and pause `.pdb()`
msgs to include some multi-line-ascii-"stuff", might as well make the
pre-prompt checks in the test suite more flexible to match.

As such, this exposes 2 new constants inside the `.devx._debug` mod:
- `._pause_msg: str` for the pre `tractor.pause()` header emitted via
  `log.pdb()` and,
- `._crash_msg: str` for the pre `._post_mortem()` equiv when handling
  errors in debug mode.

Adjust the test suite to use these values and thus make us more capable
to absorb changes in the future as well:
- add a new `in_prompt_msg()` predicate, very similar to `assert_before()`
  but minus `assert`s which takes in a `parts: list[str]` to match
  in the pre-prompt stdout.
- delegate to `in_prompt_msg()` in `assert_before()` since it was mostly
  duplicate minus `assert`.
- adjust all previous `<patt> in before` asserts to instead use
  `in_prompt_msg()` with separated pre-prompt-header vs. actor-name
  `parts`.
- use new `._pause/crash_msg` values in all such calls including any
  `assert_before()` cases.
2025-03-16 23:24:51 -04:00
Tyler Goodlet 066a35322e Support `maybe_wait_for_debugger(header_msg: str)`
Allow callers to stick in a header to the `.pdb()` level emitted msg(s)
such that any "waiting status" content is only shown if the caller
actually get's blocked waiting for the debug lock; use it inside the
`._spawn` sub-process reaper call.

Also, return early if `Lock.global_actor_in_debug == None` and thus
only enter the poll loop when actually needed, consequently raise
if we fall through the loop without acquisition.
2025-03-16 23:22:40 -04:00
Tyler Goodlet 2ebc30d708 Fix `.devx.maybe_wait_for_debugger()` polling deats
When entered by the root actor avoid excessive polling cycles by,
- blocking on the `Lock.no_remote_has_tty: trio.Event` and breaking
  *immediately* when set (though we should really also lock
  it from the root right?) to avoid extra loops..
- shielding the `await trio.sleep(poll_delay)` call to avoid any local
  cancellation causing the (presumably root-actor task) caller to move
  on (possibly to cancel its children) and instead to continue
  poll-blocking until the lock is actually released by its user.
- `break` the poll loop immediately if no remote locker is detected.
- use `.pdb()` level for reporting lock state changes.

Also add a #TODO to handle calls by non-root actors as it pertains to
2025-03-16 23:22:40 -04:00
Tyler Goodlet 57a5b7eb6f Add `stackscope` tree pprinter triggered by SIGUSR1
Can be optionally enabled via a new `enable_stack_on_sig()` which will
swap in the SIGUSR1 handler. Much thanks to @oremanj for writing this
amazing project, it's thus far helped me fix some very subtle hangs
inside our new IPC-context cancellation machinery that would have
otherwise taken much more manual pdb-ing and hair pulling XD

Full credit for `dump_task_tree()` goes to the original project author
with some minor tweaks as was handed to me via the trio-general matrix
room B)

Slight changes from orig version:
- use a `log.pdb()` emission to pprint to console
- toss in an ex sh CLI cmd to trigger the dump from another terminal
  using `kill` + `pgrep`.
2025-03-16 23:22:40 -04:00
Tyler Goodlet e269aa3751 Only use `greenback` if actor-runtime is up.. 2025-03-16 23:05:15 -04:00
Tyler Goodlet 7fc9297104 Ignore `greenback` import error if not installed 2025-03-16 23:05:15 -04:00
Tyler Goodlet 9208708b3a Change old `._debug._pause()` name, cherry to #362 re `greenback` 2025-03-16 23:05:15 -04:00
Tyler Goodlet cf2f2adec2 Runtime import `.get_root()` in stdin hijacker to avoid import cycle 2025-03-16 23:05:15 -04:00
Tyler Goodlet f28abc6720 Ignore kbis in `open_crash_handler()` by default 2025-03-16 23:05:15 -04:00
Tyler Goodlet 6f33a9891e Comment all `.pause(shield=True)` attempts again, need to solve cancel scope `.__exit__()` frame hiding issue.. 2025-03-16 23:05:15 -04:00
Tyler Goodlet 79604b7f98 Add shielding support to `.pause()`
Implement it like you'd expect using simply a wrapping
`trio.CancelScope` which is itself shielded by the input `shield: bool`
B)

There's seemingly still some issues with the frame selection when the
REPL engages and not sure how to resolve it yet but at least this does
indeed work for practical purposes. Still needs a test obviously!
2025-03-16 23:05:15 -04:00
Tyler Goodlet cec4a2a0ab Move `maybe_open_crash_handler()` CLI `--pdb`-driven wrapper to debug mod 2025-03-16 23:05:15 -04:00
Tyler Goodlet 4089e4b3ac Start `.devx.cli` extensions for pop CLI frameworks
Starting of with just a `typer` (and thus transitively `click`)
`typer.Typer.callback` hook which allows passthrough of the `--ll
<loglevel: str>` and `--pdb <debug_mode: bool>` flags for use when
building CLIs that use the runtime Bo

Still needs lotsa refinement and obviously better docs but, the doc
string for `load_runtime_vars()` shows how to use the underlying
`.devx._debug.open_crash_handler()` via a wrapper that can be passed the
`--pdb` flag and then enable debug mode throughout the entire actor
system.
2025-03-16 23:05:15 -04:00
Tyler Goodlet 5ec48310b6 Kick off `.devx` subpkg for our dev tools B)
Where `.devx` is "developer experience", a hopefully broad enough subpkg
name for all the slick stuff planned to augment working on the actor
runtime 💥

Move the `._debug` module into the new subpkg and adjust rest of core
code base to reflect import path change. Also add a new
`.devx._debug.open_crash_handler()` manager for wrapping any sync code
outside a `trio.run()` which is handy for eventual CLI addons for
popular frameworks like `click`/`typer`.
2025-03-16 23:05:14 -04:00
10 changed files with 951 additions and 333 deletions

View File

@ -10,6 +10,7 @@ TODO:
- wonder if any of it'll work on OS X?
"""
from functools import partial
import itertools
from typing import Optional
import platform
@ -26,6 +27,10 @@ from pexpect.exceptions import (
from tractor._testing import (
examples_dir,
)
from tractor.devx._debug import (
_pause_msg,
_crash_msg,
)
from conftest import (
_ci_env,
)
@ -123,20 +128,52 @@ def expect(
raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
return False
return True
def assert_before(
child,
patts: list[str],
**kwargs,
) -> None:
before = str(child.before.decode())
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
for patt in patts:
try:
assert patt in before
except AssertionError:
print(before)
raise
**kwargs
)
@pytest.fixture(
@ -195,7 +232,10 @@ def test_root_actor_error(spawn, user_in_out):
before = str(child.before.decode())
# make sure expected logging and error arrives
assert "Attaching to pdb in crashed actor: ('root'" in before
assert in_prompt_msg(
before,
[_crash_msg, "('root'"]
)
assert 'AssertionError' in before
# send user command
@ -332,7 +372,10 @@ def test_subactor_error(
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before
assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
)
if do_next:
child.sendline('n')
@ -353,9 +396,15 @@ def test_subactor_error(
before = str(child.before.decode())
# root actor gets debugger engaged
assert "Attaching to pdb in crashed actor: ('root'" in before
assert in_prompt_msg(
before,
[_crash_msg, "('root'"]
)
# error is a remote error propagated from the subactor
assert "RemoteActorError: ('name_error'" in before
assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
)
# another round
if ctlc:
@ -380,7 +429,10 @@ def test_subactor_breakpoint(
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
)
# do some "next" commands to demonstrate recurrent breakpoint
# entries
@ -396,7 +448,10 @@ def test_subactor_breakpoint(
child.sendline('continue')
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
)
if ctlc:
do_ctlc(child)
@ -441,7 +496,10 @@ def test_multi_subactors(
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
)
if ctlc:
do_ctlc(child)
@ -461,7 +519,10 @@ def test_multi_subactors(
# first name_error failure
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before
assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
)
assert "NameError" in before
if ctlc:
@ -487,7 +548,10 @@ def test_multi_subactors(
child.sendline('c')
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
)
if ctlc:
do_ctlc(child)
@ -527,9 +591,12 @@ def test_multi_subactors(
child.expect(PROMPT)
before = str(child.before.decode())
assert_before(child, [
assert_before(
child, [
# debugger attaches to root
"Attaching to pdb in crashed actor: ('root'",
# "Attaching to pdb in crashed actor: ('root'",
_crash_msg,
"('root'",
# expect a multierror with exceptions for each sub-actor
"RemoteActorError: ('breakpoint_forever'",
@ -537,7 +604,8 @@ def test_multi_subactors(
"RemoteActorError: ('spawn_error'",
"RemoteActorError: ('name_error_1'",
'bdb.BdbQuit',
])
]
)
if ctlc:
do_ctlc(child)
@ -574,15 +642,22 @@ def test_multi_daemon_subactors(
# the root's tty lock first so anticipate either crash
# message on the first entry.
bp_forever_msg = "Attaching pdb to actor: ('bp_forever'"
bp_forev_parts = [_pause_msg, "('bp_forever'"]
bp_forev_in_msg = partial(
in_prompt_msg,
parts=bp_forev_parts,
)
name_error_msg = "NameError: name 'doggypants' is not defined"
name_error_parts = [name_error_msg]
before = str(child.before.decode())
if bp_forever_msg in before:
next_msg = name_error_msg
if bp_forev_in_msg(prompt=before):
next_parts = name_error_parts
elif name_error_msg in before:
next_msg = bp_forever_msg
next_parts = bp_forev_parts
else:
raise ValueError("Neither log msg was found !?")
@ -599,7 +674,10 @@ def test_multi_daemon_subactors(
child.sendline('c')
child.expect(PROMPT)
assert_before(child, [next_msg])
assert_before(
child,
next_parts,
)
# XXX: hooray the root clobbering the child here was fixed!
# IMO, this demonstrates the true power of SC system design.
@ -623,9 +701,15 @@ def test_multi_daemon_subactors(
child.expect(PROMPT)
try:
assert_before(child, [bp_forever_msg])
assert_before(
child,
bp_forev_parts,
)
except AssertionError:
assert_before(child, [name_error_msg])
assert_before(
child,
name_error_parts,
)
else:
if ctlc:
@ -637,7 +721,10 @@ def test_multi_daemon_subactors(
child.sendline('c')
child.expect(PROMPT)
assert_before(child, [name_error_msg])
assert_before(
child,
name_error_parts,
)
# wait for final error in root
# where it crashs with boxed error
@ -647,7 +734,7 @@ def test_multi_daemon_subactors(
child.expect(PROMPT)
assert_before(
child,
[bp_forever_msg]
bp_forev_parts
)
except AssertionError:
break
@ -656,7 +743,9 @@ def test_multi_daemon_subactors(
child,
[
# boxed error raised in root task
"Attaching to pdb in crashed actor: ('root'",
# "Attaching to pdb in crashed actor: ('root'",
_crash_msg,
"('root'",
"_exceptions.RemoteActorError: ('name_error'",
]
)
@ -770,7 +859,7 @@ def test_multi_nested_subactors_error_through_nurseries(
child = spawn('multi_nested_subactors_error_up_through_nurseries')
timed_out_early: bool = False
# timed_out_early: bool = False
for send_char in itertools.cycle(['c', 'q']):
try:
@ -871,11 +960,14 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
if not timed_out_early:
before = str(child.before.decode())
assert_before(child, [
assert_before(
child,
[
"tractor._exceptions.RemoteActorError: ('spawner0'",
"tractor._exceptions.RemoteActorError: ('name_error'",
"NameError: name 'doggypants' is not defined",
])
],
)
def test_root_cancels_child_context_during_startup(
@ -909,8 +1001,10 @@ def test_different_debug_mode_per_actor(
# only one actor should enter the debugger
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('debugged_boi'" in before
assert "RuntimeError" in before
assert in_prompt_msg(
before,
[_crash_msg, "('debugged_boi'", "RuntimeError"],
)
if ctlc:
do_ctlc(child)

View File

@ -868,6 +868,9 @@ class Context:
# TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs?
# NOTE: this usage actually works here B)
# from .devx._debug import breakpoint
# await breakpoint()
# TODO: add to `Channel`?
@property

View File

@ -37,7 +37,7 @@ from ._runtime import (
# Arbiter as Registry,
async_main,
)
from . import _debug
from .devx import _debug
from . import _spawn
from . import _state
from . import log
@ -99,7 +99,7 @@ async def open_root_actor(
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler = sys.breakpointhook
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.pause_from_sync'
os.environ['PYTHONBREAKPOINT'] = 'tractor.devx._debug.pause_from_sync'
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
@ -146,7 +146,7 @@ async def open_root_actor(
# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
enable_modules.append('tractor._debug')
enable_modules.append('tractor.devx._debug')
# if debug mode get's enabled *at least* use that level of
# logging for some informative console prompts.

View File

@ -78,7 +78,7 @@ from ._exceptions import (
ContextCancelled,
TransportClosed,
)
from . import _debug
from .devx import _debug
from ._discovery import get_registry
from ._portal import Portal
from . import _state
@ -197,7 +197,7 @@ class Actor:
self._parent_main_data = _mp_fixup_main._mp_figure_out_main()
# always include debugging tools module
enable_modules.append('tractor._debug')
enable_modules.append('tractor.devx._debug')
self.enable_modules: dict[str, str] = {}
for name in enable_modules:

View File

@ -34,7 +34,7 @@ from typing import (
import trio
from trio import TaskStatus
from ._debug import (
from .devx._debug import (
maybe_wait_for_debugger,
acquire_debug_lock,
)
@ -554,13 +554,14 @@ async def trio_proc(
with trio.move_on_after(0.5):
await proc.wait()
log.pdb(
'Delaying subproc reaper while debugger locked..'
)
await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False
),
header_msg=(
'Delaying subproc reaper while debugger locked..\n'
),
# TODO: need a diff value then default?
# poll_steps=9999999,
)

View File

@ -28,7 +28,7 @@ import warnings
import trio
from ._debug import maybe_wait_for_debugger
from .devx._debug import maybe_wait_for_debugger
from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel
from ._runtime import Actor

View File

@ -0,0 +1,37 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Runtime "developer experience" utils and addons to aid our
(advanced) users and core devs in building distributed applications
and working with/on the actor runtime.
"""
from ._debug import (
maybe_wait_for_debugger as maybe_wait_for_debugger,
acquire_debug_lock as acquire_debug_lock,
breakpoint as breakpoint,
pause as pause,
pause_from_sync as pause_from_sync,
shield_sigint_handler as shield_sigint_handler,
MultiActorPdb as MultiActorPdb,
open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler,
post_mortem as post_mortem,
)
from ._stackscope import (
enable_stack_on_sig as enable_stack_on_sig,
)

View File

@ -1,18 +1,19 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
"""
Multi-core debugging for da peeps!
@ -20,14 +21,19 @@ Multi-core debugging for da peeps!
"""
from __future__ import annotations
import bdb
import os
import sys
import signal
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
nullcontext,
)
from functools import (
partial,
cached_property,
)
from contextlib import asynccontextmanager as acm
import os
import signal
import sys
import traceback
from typing import (
Any,
Callable,
@ -39,27 +45,31 @@ from types import FrameType
import pdbp
import tractor
import trio
from trio.lowlevel import current_task
from trio_typing import (
TaskStatus,
# Task,
)
from .log import get_logger
from ._discovery import get_root
from ._state import (
from ..log import get_logger
from .._state import (
current_actor,
is_root_process,
debug_mode,
)
from ._exceptions import (
from .._exceptions import (
is_multi_cancelled,
ContextCancelled,
)
from ._ipc import Channel
from .._ipc import Channel
log = get_logger(__name__)
__all__ = ['breakpoint', 'post_mortem']
__all__ = [
'breakpoint',
'post_mortem',
]
class Lock:
@ -85,12 +95,12 @@ class Lock:
# and must be cancelled if this actor is cancelled via IPC
# request-message otherwise deadlocks with the parent actor may
# ensure
_debugger_request_cs: trio.CancelScope | None = None
_debugger_request_cs: trio.CancelScope|None = None
# NOTE: set only in the root actor for the **local** root spawned task
# which has acquired the lock (i.e. this is on the callee side of
# the `lock_tty_for_child()` context entry).
_root_local_task_cs_in_debug: trio.CancelScope | None = None
_root_local_task_cs_in_debug: trio.CancelScope|None = None
# actor tree-wide actor uid that supposedly has the tty lock
global_actor_in_debug: tuple[str, str] = None
@ -232,7 +242,7 @@ async def _acquire_debug_lock_from_root_task(
to the ``pdb`` repl.
'''
task_name: str = trio.lowlevel.current_task().name
task_name: str = current_task().name
we_acquired: bool = False
log.runtime(
@ -317,14 +327,13 @@ async def lock_tty_for_child(
highly reliable at releasing the mutex complete!
'''
task_name = trio.lowlevel.current_task().name
task_name: str = current_task().name
if tuple(subactor_uid) in Lock._blocked:
log.warning(
f'Actor {subactor_uid} is blocked from acquiring debug lock\n'
f"remote task: {task_name}:{subactor_uid}"
)
ctx._enter_debugger_on_cancel = False
ctx._enter_debugger_on_cancel: bool = False
await ctx.cancel(f'Debug lock blocked for {subactor_uid}')
return 'pdb_lock_blocked'
@ -375,12 +384,14 @@ async def wait_for_parent_stdin_hijack(
This function is used by any sub-actor to acquire mutex access to
the ``pdb`` REPL and thus the root's TTY for interactive debugging
(see below inside ``_pause()``). It can be used to ensure that
(see below inside ``pause()``). It can be used to ensure that
an intermediate nursery-owning actor does not clobber its children
if they are in debug (see below inside
``maybe_wait_for_debugger()``).
'''
from .._discovery import get_root
with trio.CancelScope(shield=True) as cs:
Lock._debugger_request_cs = cs
@ -390,7 +401,7 @@ async def wait_for_parent_stdin_hijack(
# this syncs to child's ``Context.started()`` call.
async with portal.open_context(
tractor._debug.lock_tty_for_child,
lock_tty_for_child,
subactor_uid=actor_uid,
) as (ctx, val):
@ -399,11 +410,13 @@ async def wait_for_parent_stdin_hijack(
assert val == 'Locked'
async with ctx.open_stream() as stream:
# unblock local caller
try:
# unblock local caller
assert Lock.local_pdb_complete
task_status.started(cs)
# wait for local task to exit and
# release the REPL
await Lock.local_pdb_complete.wait()
finally:
@ -441,171 +454,6 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
return pdb, Lock.unshield_sigint
async def _pause(
debug_func: Callable | None = None,
release_lock_signal: trio.Event | None = None,
# TODO:
# shield: bool = False
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED
) -> None:
'''
A pause point (more commonly known as a "breakpoint") interrupt
instruction for engaging a blocking debugger instance to
conduct manual console-based-REPL-interaction from within
`tractor`'s async runtime, normally from some single-threaded
and currently executing actor-hosted-`trio`-task in some
(remote) process.
NOTE: we use the semantics "pause" since it better encompasses
the entirety of the necessary global-runtime-state-mutation any
actor-task must access and lock in order to get full isolated
control over the process tree's root TTY:
https://en.wikipedia.org/wiki/Breakpoint
'''
__tracebackhide__ = True
actor = tractor.current_actor()
pdb, undo_sigint = mk_mpdb()
task_name = trio.lowlevel.current_task().name
# TODO: is it possible to debug a trio.Cancelled except block?
# right now it seems like we can kinda do with by shielding
# around ``tractor.breakpoint()`` but not if we move the shielded
# scope here???
# with trio.CancelScope(shield=shield):
# await trio.lowlevel.checkpoint()
if (
not Lock.local_pdb_complete
or Lock.local_pdb_complete.is_set()
):
Lock.local_pdb_complete = trio.Event()
# TODO: need a more robust check for the "root" actor
if (
not is_root_process()
and actor._parent_chan # a connected child
):
if Lock.local_task_in_debug:
# Recurrence entry case: this task already has the lock and
# is likely recurrently entering a breakpoint
if Lock.local_task_in_debug == task_name:
# noop on recurrent entry case but we want to trigger
# a checkpoint to allow other actors error-propagate and
# potetially avoid infinite re-entries in some subactor.
await trio.lowlevel.checkpoint()
return
# if **this** actor is already in debug mode block here
# waiting for the control to be released - this allows
# support for recursive entries to `tractor.breakpoint()`
log.warning(f"{actor.uid} already has a debug lock, waiting...")
await Lock.local_pdb_complete.wait()
await trio.sleep(0.1)
# mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process
Lock.local_task_in_debug = task_name
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery.
# TODO: if we want to debug a trio.Cancelled triggered exception
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below:
# ```python
# actor._service_n.cancel_scope.shield = shield
# ```
# but not entirely sure if that's a sane way to implement it?
try:
with trio.CancelScope(shield=True):
await actor._service_n.start(
wait_for_parent_stdin_hijack,
actor.uid,
)
Lock.repl = pdb
except RuntimeError:
Lock.release()
if actor._cancel_called:
# service nursery won't be usable and we
# don't want to lock up the root either way since
# we're in (the midst of) cancellation.
return
raise
elif is_root_process():
# we also wait in the root-parent for any child that
# may have the tty locked prior
# TODO: wait, what about multiple root tasks acquiring it though?
if Lock.global_actor_in_debug == actor.uid:
# re-entrant root process already has it: noop.
return
# XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm.
if Lock._debug_lock.locked():
log.warning(
'Root actor attempting to shield-acquire active tty lock'
f' owned by {Lock.global_actor_in_debug}')
# must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty
with trio.CancelScope(shield=True):
await Lock._debug_lock.acquire()
else:
# may be cancelled
await Lock._debug_lock.acquire()
Lock.global_actor_in_debug = actor.uid
Lock.local_task_in_debug = task_name
Lock.repl = pdb
try:
# breakpoint()
if debug_func is None:
# assert release_lock_signal, (
# 'Must pass `release_lock_signal: trio.Event` if no '
# 'trace func provided!'
# )
print(f"{actor.uid} ENTERING WAIT")
task_status.started()
# with trio.CancelScope(shield=True):
# await release_lock_signal.wait()
else:
# block here one (at the appropriate frame *up*) where
# ``breakpoint()`` was awaited and begin handling stdio.
log.debug("Entering the synchronous world of pdb")
debug_func(actor, pdb)
except bdb.BdbQuit:
Lock.release()
raise
# XXX: apparently we can't do this without showing this frame
# in the backtrace on first entry to the REPL? Seems like an odd
# behaviour that should have been fixed by now. This is also why
# we scrapped all the @cm approaches that were tried previously.
# finally:
# __tracebackhide__ = True
# # frame = sys._getframe()
# # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbp.hideframe(signal.signal)
def shield_sigint_handler(
signum: int,
frame: 'frame', # type: ignore # noqa
@ -625,7 +473,7 @@ def shield_sigint_handler(
uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug
actor = tractor.current_actor()
actor = current_actor()
# print(f'{actor.uid} in HANDLER with ')
def do_cancel():
@ -764,27 +612,62 @@ def shield_sigint_handler(
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
_pause_msg: str = 'Attaching to pdb REPL in actor'
def _set_trace(
actor: tractor.Actor | None = None,
pdb: MultiActorPdb | None = None,
):
__tracebackhide__ = True
actor: tractor.Actor = actor or tractor.current_actor()
shield: bool = False,
# start 2 levels up in user code
frame: FrameType | None = sys._getframe()
if frame:
extra_frames_up_when_async: int = 1,
):
__tracebackhide__: bool = True
actor: tractor.Actor = actor or current_actor()
# always start 1 level up from THIS in user code.
frame: FrameType|None
if frame := sys._getframe():
frame: FrameType = frame.f_back # type: ignore
if (
frame
and pdb
and (
pdb
and actor is not None
)
# or shield
):
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
msg: str = _pause_msg
if shield:
# log.warning(
msg = (
'\n\n'
' ------ - ------\n'
'Debugger invoked with `shield=True` so an extra\n'
'`trio.CancelScope.__exit__()` frame is shown..\n'
'\n'
'Try going up one frame to see your pause point!\n'
'\n'
' SORRY we need to fix this!\n'
' ------ - ------\n\n'
) + msg
# pdbp.set_trace()
# TODO: maybe print the actor supervion tree up to the
# root here? Bo
log.pdb(
f'{msg}\n'
'|\n'
f'|_ {actor.uid}\n'
)
# no f!#$&* idea, but when we're in async land
# we need 2x frames up?
frame = frame.f_back
for i in range(extra_frames_up_when_async):
frame: FrameType = frame.f_back
log.debug(
f'Going up frame {i} -> {frame}\n'
)
else:
pdb, undo_sigint = mk_mpdb()
@ -794,18 +677,278 @@ def _set_trace(
Lock.local_task_in_debug = 'sync'
pdb.set_trace(frame=frame)
# undo_
# TODO: allow pausing from sync code, normally by remapping
# python's builtin breakpoint() hook to this runtime aware version.
async def _pause(
debug_func: Callable = _set_trace,
release_lock_signal: trio.Event | None = None,
# TODO: allow caller to pause despite task cancellation,
# exactly the same as wrapping with:
# with CancelScope(shield=True):
# await pause()
# => the REMAINING ISSUE is that the scope's .__exit__() frame
# is always show in the debugger on entry.. and there seems to
# be no way to override it?..
# shield: bool = False,
shield: bool = False,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED
) -> None:
'''
Inner impl for `pause()` to avoid the `trio.CancelScope.__exit__()`
stack frame when not shielded (since apparently i can't figure out
how to hide it using the normal mechanisms..)
Hopefully we won't need this in the long run.
'''
__tracebackhide__: bool = True
actor = current_actor()
pdb, undo_sigint = mk_mpdb()
task_name: str = trio.lowlevel.current_task().name
if (
not Lock.local_pdb_complete
or Lock.local_pdb_complete.is_set()
):
Lock.local_pdb_complete = trio.Event()
debug_func = partial(
debug_func,
)
# TODO: need a more robust check for the "root" actor
if (
not is_root_process()
and actor._parent_chan # a connected child
):
if Lock.local_task_in_debug:
# Recurrence entry case: this task already has the lock and
# is likely recurrently entering a breakpoint
if Lock.local_task_in_debug == task_name:
# noop on recurrent entry case but we want to trigger
# a checkpoint to allow other actors error-propagate and
# potetially avoid infinite re-entries in some subactor.
await trio.lowlevel.checkpoint()
return
# if **this** actor is already in debug mode block here
# waiting for the control to be released - this allows
# support for recursive entries to `tractor.breakpoint()`
log.warning(f"{actor.uid} already has a debug lock, waiting...")
await Lock.local_pdb_complete.wait()
await trio.sleep(0.1)
# mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process
Lock.local_task_in_debug = task_name
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery.
# TODO: if we want to debug a trio.Cancelled triggered exception
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below:
# ```python
# actor._service_n.cancel_scope.shield = shield
# ```
# but not entirely sure if that's a sane way to implement it?
try:
with trio.CancelScope(shield=True):
await actor._service_n.start(
wait_for_parent_stdin_hijack,
actor.uid,
)
Lock.repl = pdb
except RuntimeError:
Lock.release()
if actor._cancel_called:
# service nursery won't be usable and we
# don't want to lock up the root either way since
# we're in (the midst of) cancellation.
return
raise
elif is_root_process():
# we also wait in the root-parent for any child that
# may have the tty locked prior
# TODO: wait, what about multiple root tasks acquiring it though?
if Lock.global_actor_in_debug == actor.uid:
# re-entrant root process already has it: noop.
return
# XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm.
if Lock._debug_lock.locked():
log.warning(
'Root actor attempting to shield-acquire active tty lock'
f' owned by {Lock.global_actor_in_debug}')
# must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty
with trio.CancelScope(shield=True):
await Lock._debug_lock.acquire()
else:
# may be cancelled
await Lock._debug_lock.acquire()
Lock.global_actor_in_debug = actor.uid
Lock.local_task_in_debug = task_name
Lock.repl = pdb
try:
# TODO: do we want to support using this **just** for the
# locking / common code (prolly to help address #320)?
#
# if debug_func is None:
# assert release_lock_signal, (
# 'Must pass `release_lock_signal: trio.Event` if no '
# 'trace func provided!'
# )
# print(f"{actor.uid} ENTERING WAIT")
# with trio.CancelScope(shield=True):
# await release_lock_signal.wait()
# else:
# block here one (at the appropriate frame *up*) where
# ``breakpoint()`` was awaited and begin handling stdio.
log.debug('Entering sync world of the `pdb` REPL..')
try:
debug_func(
actor,
pdb,
extra_frames_up_when_async=2,
shield=shield,
)
except BaseException:
log.exception(
'Failed to invoke internal `debug_func = '
f'{debug_func.func.__name__}`\n'
)
raise
except bdb.BdbQuit:
Lock.release()
raise
except BaseException:
log.exception(
'Failed to engage debugger via `_pause()` ??\n'
)
raise
# XXX: apparently we can't do this without showing this frame
# in the backtrace on first entry to the REPL? Seems like an odd
# behaviour that should have been fixed by now. This is also why
# we scrapped all the @cm approaches that were tried previously.
# finally:
# __tracebackhide__ = True
# # frame = sys._getframe()
# # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbp.hideframe(signal.signal)
async def pause(
debug_func: Callable = _set_trace,
release_lock_signal: trio.Event | None = None,
# TODO: allow caller to pause despite task cancellation,
# exactly the same as wrapping with:
# with CancelScope(shield=True):
# await pause()
# => the REMAINING ISSUE is that the scope's .__exit__() frame
# is always show in the debugger on entry.. and there seems to
# be no way to override it?..
# shield: bool = False,
shield: bool = False,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED
) -> None:
'''
A pause point (more commonly known as a "breakpoint") interrupt
instruction for engaging a blocking debugger instance to
conduct manual console-based-REPL-interaction from within
`tractor`'s async runtime, normally from some single-threaded
and currently executing actor-hosted-`trio`-task in some
(remote) process.
NOTE: we use the semantics "pause" since it better encompasses
the entirety of the necessary global-runtime-state-mutation any
actor-task must access and lock in order to get full isolated
control over the process tree's root TTY:
https://en.wikipedia.org/wiki/Breakpoint
'''
__tracebackhide__: bool = True
if shield:
# NOTE XXX: even hard coding this inside the `class CancelScope:`
# doesn't seem to work for me!?
# ^ XXX ^
# def _exit(self, *args, **kwargs):
# __tracebackhide__: bool = True
# super().__exit__(*args, **kwargs)
trio.CancelScope.__enter__.__tracebackhide__ = True
trio.CancelScope.__exit__.__tracebackhide__ = True
# import types
# with trio.CancelScope(shield=shield) as cs:
# cs.__exit__ = types.MethodType(_exit, cs)
# cs.__exit__.__tracebackhide__ = True
with trio.CancelScope(shield=shield) as cs:
# setattr(cs.__exit__.__func__, '__tracebackhide__', True)
# setattr(cs.__enter__.__func__, '__tracebackhide__', True)
# NOTE: so the caller can always cancel even if shielded
task_status.started(cs)
return await _pause(
debug_func=debug_func,
release_lock_signal=release_lock_signal,
shield=True,
task_status=task_status,
)
else:
return await _pause(
debug_func=debug_func,
release_lock_signal=release_lock_signal,
shield=False,
task_status=task_status,
)
# TODO: allow pausing from sync code.
# normally by remapping python's builtin breakpoint() hook to this
# runtime aware version which takes care of all .
def pause_from_sync() -> None:
print("ENTER SYNC PAUSE")
actor: tractor.Actor = current_actor(
err_on_no_runtime=False,
)
if actor:
try:
import greenback
__tracebackhide__ = True
# __tracebackhide__ = True
actor: tractor.Actor = tractor.current_actor()
# task_can_release_tty_lock = trio.Event()
# spawn bg task which will lock out the TTY, we poll
@ -818,8 +961,11 @@ def pause_from_sync() -> None:
# release_lock_signal=task_can_release_tty_lock,
))
)
except ModuleNotFoundError:
log.warning('NO GREENBACK FOUND')
else:
log.warning('Not inside actor-runtime')
db, undo_sigint = mk_mpdb()
Lock.local_task_in_debug = 'sync'
@ -854,11 +1000,7 @@ def pause_from_sync() -> None:
# using the "pause" semantics instead since
# that better covers actually somewhat "pausing the runtime"
# for this particular paralell task to do debugging B)
pause = partial(
_pause,
_set_trace,
)
pp = pause # short-hand for "pause point"
# pp = pause # short-hand for "pause point"
async def breakpoint(**kwargs):
@ -869,9 +1011,18 @@ async def breakpoint(**kwargs):
await pause(**kwargs)
_crash_msg: str = (
'Attaching to pdb REPL in crashed actor'
)
def _post_mortem(
actor: tractor.Actor,
pdb: MultiActorPdb,
shield: bool = False,
# only for compat with `._set_trace()`..
extra_frames_up_when_async=0,
) -> None:
'''
@ -879,20 +1030,28 @@ def _post_mortem(
debugger instance.
'''
log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
# TODO: print the actor supervion tree up to the root
# here! Bo
log.pdb(
f'{_crash_msg}\n'
'|\n'
f'|_ {actor.uid}\n'
)
# TODO: you need ``pdbpp`` master (at least this commit
# https://github.com/pdbpp/pdbpp/commit/b757794857f98d53e3ebbe70879663d7d843a6c2)
# to fix this and avoid the hang it causes. See issue:
# https://github.com/pdbpp/pdbpp/issues/480
# TODO: help with a 3.10+ major release if/when it arrives.
pdbp.xpm(Pdb=lambda: pdb)
# TODO: only replacing this to add the
# `end=''` to the print XD
# pdbp.xpm(Pdb=lambda: pdb)
info = sys.exc_info()
print(traceback.format_exc(), end='')
pdbp.post_mortem(
t=info[2],
Pdb=lambda: pdb,
)
post_mortem = partial(
_pause,
_post_mortem,
pause,
debug_func=_post_mortem,
)
@ -933,9 +1092,10 @@ async def acquire_debug_lock(
'''
Grab root's debug lock on entry, release on exit.
This helper is for actor's who don't actually need
to acquired the debugger but want to wait until the
lock is free in the process-tree root.
This helper is for actor's who don't actually need to acquired
the debugger but want to wait until the lock is free in the
process-tree root such that they don't clobber an ongoing pdb
REPL session in some peer or child!
'''
if not debug_mode():
@ -956,14 +1116,18 @@ async def maybe_wait_for_debugger(
poll_delay: float = 0.1,
child_in_debug: bool = False,
) -> None:
header_msg: str = '',
) -> bool: # was locked and we polled?
if (
not debug_mode()
and not child_in_debug
):
return
return False
msg: str = header_msg
if (
is_root_process()
):
@ -973,41 +1137,147 @@ async def maybe_wait_for_debugger(
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
sub_in_debug: tuple[str, str] | None = None
in_debug: tuple[str, str]|None = Lock.global_actor_in_debug
debug_complete: trio.Event|None = Lock.no_remote_has_tty
for _ in range(poll_steps):
if Lock.global_actor_in_debug:
sub_in_debug = tuple(Lock.global_actor_in_debug)
log.debug('Root polling for debug')
with trio.CancelScope(shield=True):
await trio.sleep(poll_delay)
# TODO: could this make things more deterministic? wait
# to see if a sub-actor task will be scheduled and grab
# the tty lock on the next tick?
# XXX: doesn't seem to work
# await trio.testing.wait_all_tasks_blocked(cushion=0)
debug_complete = Lock.no_remote_has_tty
if (
debug_complete
and sub_in_debug is not None
and not debug_complete.is_set()
):
log.pdb(
'Root has errored but pdb is in use by '
f'child {sub_in_debug}\n'
'Waiting on tty lock to release..'
if in_debug == current_actor().uid:
log.debug(
msg
+
'Root already owns the TTY LOCK'
)
return True
await debug_complete.wait()
await trio.sleep(poll_delay)
continue
elif in_debug:
msg += (
f'Debug `Lock` in use by subactor: {in_debug}\n'
)
# TODO: could this make things more deterministic?
# wait to see if a sub-actor task will be
# scheduled and grab the tty lock on the next
# tick?
# XXX => but it doesn't seem to work..
# await trio.testing.wait_all_tasks_blocked(cushion=0)
else:
log.debug(
'Root acquired TTY LOCK'
msg
+
'Root immediately acquired debug TTY LOCK'
)
return False
for istep in range(poll_steps):
if (
debug_complete
and not debug_complete.is_set()
and in_debug is not None
):
log.pdb(
msg
+
'Root is waiting on tty lock to release..\n'
)
with trio.CancelScope(shield=True):
await debug_complete.wait()
log.pdb(
f'Child subactor released debug lock\n'
f'|_{in_debug}\n'
)
# is no subactor locking debugger currently?
if (
in_debug is None
and (
debug_complete is None
or debug_complete.is_set()
)
):
log.pdb(
msg
+
'Root acquired tty lock!'
)
break
else:
# TODO: don't need this right?
# await trio.lowlevel.checkpoint()
log.debug(
'Root polling for debug:\n'
f'poll step: {istep}\n'
f'poll delya: {poll_delay}'
)
with trio.CancelScope(shield=True):
await trio.sleep(poll_delay)
continue
# fallthrough on failure to acquire..
# else:
# raise RuntimeError(
# msg
# +
# 'Root actor failed to acquire debug lock?'
# )
return True
# else:
# # TODO: non-root call for #320?
# this_uid: tuple[str, str] = current_actor().uid
# async with acquire_debug_lock(
# subactor_uid=this_uid,
# ):
# pass
return False
# TODO: better naming and what additionals?
# - [ ] optional runtime plugging?
# - [ ] detection for sync vs. async code?
# - [ ] specialized REPL entry when in distributed mode?
# - [x] allow ignoring kbi Bo
@cm
def open_crash_handler(
catch: set[BaseException] = {
Exception,
BaseException,
},
ignore: set[BaseException] = {
KeyboardInterrupt,
},
):
'''
Generic "post mortem" crash handler using `pdbp` REPL debugger.
We expose this as a CLI framework addon to both `click` and
`typer` users so they can quickly wrap cmd endpoints which get
automatically wrapped to use the runtime's `debug_mode: bool`
AND `pdbp.pm()` around any code that is PRE-runtime entry
- any sync code which runs BEFORE the main call to
`trio.run()`.
'''
try:
yield
except tuple(catch) as err:
if type(err) not in ignore:
pdbp.xpm()
raise
@cm
def maybe_open_crash_handler(pdb: bool = False):
'''
Same as `open_crash_handler()` but with bool input flag
to allow conditional handling.
Normally this is used with CLI endpoints such that if the --pdb
flag is passed the pdb REPL is engaed on any crashes B)
'''
rtctx = nullcontext
if pdb:
rtctx = open_crash_handler
with rtctx():
yield

View File

@ -0,0 +1,84 @@
# tractor: structured concurrent "actors".
# Copyright eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
The fundamental cross process SC abstraction: an inter-actor,
cancel-scope linked task "context".
A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built
into each ``trio.Nursery`` except it links the lifetimes of memory space
disjoint, parallel executing tasks in separate actors.
'''
from signal import (
signal,
SIGUSR1,
)
import trio
@trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None:
import stackscope
from tractor.log import get_console_log
tree_str: str = str(
stackscope.extract(
trio.lowlevel.current_root_task(),
recurse_child_tasks=True
)
)
log = get_console_log('cancel')
log.pdb(
f'Dumping `stackscope` tree:\n\n'
f'{tree_str}\n'
)
# import logging
# try:
# with open("/dev/tty", "w") as tty:
# tty.write(tree_str)
# except BaseException:
# logging.getLogger(
# "task_tree"
# ).exception("Error printing task tree")
def signal_handler(sig: int, frame: object) -> None:
import traceback
try:
trio.lowlevel.current_trio_token(
).run_sync_soon(dump_task_tree)
except RuntimeError:
# not in async context -- print a normal traceback
traceback.print_stack()
def enable_stack_on_sig(
sig: int = SIGUSR1
) -> None:
'''
Enable `stackscope` tracing on reception of a signal; by
default this is SIGUSR1.
'''
signal(
sig,
signal_handler,
)
# NOTE: not the above can be triggered from
# a (xonsh) shell using:
# kill -SIGUSR1 @$(pgrep -f '<cmd>')

129
tractor/devx/cli.py 100644
View File

@ -0,0 +1,129 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
CLI framework extensions for hacking on the actor runtime.
Currently popular frameworks supported are:
- `typer` via the `@callback` API
"""
from __future__ import annotations
from typing import (
Any,
Callable,
)
from typing_extensions import Annotated
import typer
_runtime_vars: dict[str, Any] = {}
def load_runtime_vars(
ctx: typer.Context,
callback: Callable,
pdb: bool = False, # --pdb
ll: Annotated[
str,
typer.Option(
'--loglevel',
'-l',
help='BigD logging level',
),
] = 'cancel', # -l info
):
'''
Maybe engage crash handling with `pdbp` when code inside
a `typer` CLI endpoint cmd raises.
To use this callback simply take your `app = typer.Typer()` instance
and decorate this function with it like so:
.. code:: python
from tractor.devx import cli
app = typer.Typer()
# manual decoration to hook into `click`'s context system!
cli.load_runtime_vars = app.callback(
invoke_without_command=True,
)
And then you can use the now augmented `click` CLI context as so,
.. code:: python
@app.command(
context_settings={
"allow_extra_args": True,
"ignore_unknown_options": True,
}
)
def my_cli_cmd(
ctx: typer.Context,
):
rtvars: dict = ctx.runtime_vars
pdb: bool = rtvars['pdb']
with tractor.devx.cli.maybe_open_crash_handler(pdb=pdb):
trio.run(
partial(
my_tractor_main_task_func,
debug_mode=pdb,
loglevel=rtvars['ll'],
)
)
which will enable log level and debug mode globally for the entire
`tractor` + `trio` runtime thereafter!
Bo
'''
global _runtime_vars
_runtime_vars |= {
'pdb': pdb,
'll': ll,
}
ctx.runtime_vars: dict[str, Any] = _runtime_vars
print(
f'`typer` sub-cmd: {ctx.invoked_subcommand}\n'
f'`tractor` runtime vars: {_runtime_vars}'
)
# XXX NOTE XXX: hackzone.. if no sub-cmd is specified (the
# default if the user just invokes `bigd`) then we simply
# invoke the sole `_bigd()` cmd passing in the "parent"
# typer.Context directly to that call since we're treating it
# as a "non sub-command" or wtv..
# TODO: ideally typer would have some kinda built-in way to get
# this behaviour without having to construct and manually
# invoke our own cmd..
if (
ctx.invoked_subcommand is None
or ctx.invoked_subcommand == callback.__name__
):
cmd: typer.core.TyperCommand = typer.core.TyperCommand(
name='bigd',
callback=callback,
)
ctx.params = {'ctx': ctx}
cmd.invoke(ctx)