Compare commits

...

3 Commits

Author SHA1 Message Date
Tyler Goodlet f7469442e3 Use "sclang"-style syntax in `to_asyncio` task logging
Just like we've started doing throughout the rest of the actor runtime
for reporting (and where "sclang" = "structured conc (s)lang", our
little supervision-focused operations syntax i've been playing with in
log msg content).

Further tweaks:
- report the `trio_done_fute` alongside the `main_outcome` value.
- add a todo list for supporting `greenback` for pause points.
2024-07-11 19:22:40 -04:00
Tyler Goodlet 8363317e11 Pass `infect_asyncio` setting via runtime-vars
The reason for this "duplication" with the `--asyncio` CLI flag (passed
to the child during spawn) is 2-fold:
- allows verifying inside `Actor._from_parent()` that the `trio` runtime was
  started via `.start_guest_run()` as well as if the
  `Actor._infected_aio` spawn-entrypoint value has been set (by the
  `._entry.<spawn-backend>_main()` whenever `--asyncio` is passed)
  such that any mismatch can be signaled via an `InternalError`.
- enables checking the `._state._runtime_vars['_is_infected_aio']` value
  directly (say from a non-actor/`trio`-thread) instead of calling
  `._state.current_actor(err_on_no_runtime=False)` in certain edge
  cases.

Impl/testing deats:
- add `._state._runtime_vars['_is_infected_aio'] = False` default.
- raise `InternalError` on any `--asyncio`-flag-passed vs.
  `_runtime_vars`-value-relayed-from-parent inside
  `Actor._from_parent()` and include a `Runner.is_guest` assert for good
  measure B)
- set and relay `infect_asyncio: bool` via runtime-vars to child in
  `ActorNursery.start_actor()`.
- verify `actor.is_infected_aio()`, `actor._infected_aio` and
  `_state._runtime_vars['_is_infected_aio']` are all set in test suite's
  `asyncio_actor()` endpoint.
2024-07-11 13:22:53 -04:00
Tyler Goodlet a628eabb30 Officially test proto-ed `stackscope` integration
By re-purposing our `pexpect`-based console matching with a new
`debugging/shield_hang_in_sub.py` example, this tests a few "hanging
actor" conditions more formally:

- that despite a hanging actor's task we can dump
  a `stackscope.extract()` tree on relay of `SIGUSR1`.
- the actor tree will terminate despite a shielded forever-sleep by our
  "T-800" zombie reaper machinery activating and hard killing the
  underlying subprocess.

Some test deats:
- simulates the expect actions of a real user by manually using
  `os.kill()` to send both signals to the actor-tree program.
- `pexpect`-matches against `log.devx()` emissions under normal
  `debug_mode == True` usage.
- ensure we get the actual "T-800 deployed" `log.error()` msg and
  that the actor tree eventually terminates!

Surrounding (re-org/impl/test-suite) changes:
- allow disabling usage via a `maybe_enable_greenback: bool` to
  `open_root_actor()` but enable by def.
- pretty up the actual `.devx()` content from `.devx._stackscope`
  including be extra pedantic about the conc-primitives for each signal
  event.
- try to avoid double handles of `SIGUSR1` even though it seems the
  original (what i thought was a) problem was actually just double
  logging in the handler..
  |_ avoid double applying the handler func via `signal.signal()`,
  |_ use a global to avoid double handle func calls and,
  |_ a `threading.RLock` around handling.
- move common fixtures and helper routines from `test_debugger` to
  `tests/devx/conftest.py` and import them for use in both test mods.
2024-07-10 19:58:27 -04:00
13 changed files with 620 additions and 226 deletions

View File

@ -0,0 +1,81 @@
'''
Verify we can dump a `stackscope` tree on a hang.
'''
import os
import signal
import trio
import tractor
@tractor.context
async def start_n_shield_hang(
ctx: tractor.Context,
):
# actor: tractor.Actor = tractor.current_actor()
# sync to parent-side task
await ctx.started(os.getpid())
print('Entering shield sleep..')
with trio.CancelScope(shield=True):
await trio.sleep_forever() # in subactor
# XXX NOTE ^^^ since this shields, we expect
# the zombie reaper (aka T800) to engage on
# SIGINT from the user and eventually hard-kill
# this subprocess!
async def main(
from_test: bool = False,
) -> None:
async with (
tractor.open_nursery(
debug_mode=True,
enable_stack_on_sig=True,
# maybe_enable_greenback=False,
loglevel='devx',
) as an,
):
ptl: tractor.Portal = await an.start_actor(
'hanger',
enable_modules=[__name__],
debug_mode=True,
)
async with ptl.open_context(
start_n_shield_hang,
) as (ctx, cpid):
_, proc, _ = an._children[ptl.chan.uid]
assert cpid == proc.pid
print(
'Yo my child hanging..?\n'
'Sending SIGUSR1 to see a tree-trace!\n'
)
# XXX simulate the wrapping test's "user actions"
# (i.e. if a human didn't run this manually but wants to
# know what they should do to reproduce test behaviour)
if from_test:
os.kill(
cpid,
signal.SIGUSR1,
)
# simulate user cancelling program
await trio.sleep(0.5)
os.kill(
os.getpid(),
signal.SIGINT,
)
else:
# actually let user send the ctl-c
await trio.sleep_forever() # in root
if __name__ == '__main__':
trio.run(main)

View File

View File

@ -0,0 +1,167 @@
'''
`tractor.devx.*` tooling sub-pkg test space.
'''
from typing import (
Callable,
)
import pytest
from pexpect.exceptions import (
TIMEOUT,
)
from tractor._testing import (
mk_cmd,
)
@pytest.fixture
def spawn(
start_method,
testdir: pytest.Testdir,
reg_addr: tuple[str, int],
) -> Callable[[str], None]:
'''
Use the `pexpect` module shipped via `testdir.spawn()` to
run an `./examples/..` script by name.
'''
if start_method != 'trio':
pytest.skip(
'`pexpect` based tests only supported on `trio` backend'
)
def _spawn(
cmd: str,
**mkcmd_kwargs,
):
return testdir.spawn(
cmd=mk_cmd(
cmd,
**mkcmd_kwargs,
),
expect_timeout=3,
)
# such that test-dep can pass input script name.
return _spawn
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
def expect(
child,
# normally a `pdb` prompt by default
patt: str,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
err_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
__tracebackhide__: bool = False
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
if err_on_false:
raise ValueError(
f'Could not find pattern: {part!r} in `before` output?'
)
return False
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child,
patts: list[str],
**kwargs,
) -> None:
__tracebackhide__: bool = False
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
# since this is an "assert" helper ;)
err_on_false=True,
**kwargs
)

View File

@ -16,7 +16,6 @@ import platform
import time import time
import pytest import pytest
import pexpect
from pexpect.exceptions import ( from pexpect.exceptions import (
TIMEOUT, TIMEOUT,
EOF, EOF,
@ -27,12 +26,14 @@ from tractor.devx._debug import (
_crash_msg, _crash_msg,
_repl_fail_msg, _repl_fail_msg,
) )
from tractor._testing import (
mk_cmd,
)
from conftest import ( from conftest import (
_ci_env, _ci_env,
) )
from .conftest import (
expect,
in_prompt_msg,
assert_before,
)
# TODO: The next great debugger audit could be done by you! # TODO: The next great debugger audit could be done by you!
# - recurrent entry to breakpoint() from single actor *after* and an # - recurrent entry to breakpoint() from single actor *after* and an
@ -69,154 +70,9 @@ has_nested_actors = pytest.mark.has_nested_actors
# ) # )
@pytest.fixture
def spawn(
start_method,
testdir,
reg_addr,
) -> 'pexpect.spawn':
'''
Use the `pexpect` module shipped via `testdir.spawn()` to
run an `./examples/..` script by name.
'''
if start_method != 'trio':
pytest.skip(
'`pexpect` based tests only supported on `trio` backend'
)
def _spawn(
cmd: str,
**mkcmd_kwargs,
):
return testdir.spawn(
cmd=mk_cmd(
cmd,
**mkcmd_kwargs,
),
expect_timeout=3,
)
# such that test-dep can pass input script name.
return _spawn
PROMPT = r"\(Pdb\+\)" PROMPT = r"\(Pdb\+\)"
def expect(
child,
# prompt by default
patt: str = PROMPT,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
__tracebackhide__: bool = False
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
return False
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child,
patts: list[str],
**kwargs,
) -> None:
__tracebackhide__: bool = False
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
**kwargs
)
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
@pytest.mark.parametrize( @pytest.mark.parametrize(
'user_in_out', 'user_in_out',
[ [
@ -281,7 +137,7 @@ def test_root_actor_bp(spawn, user_in_out):
child.expect('\r\n') child.expect('\r\n')
# process should exit # process should exit
child.expect(pexpect.EOF) child.expect(EOF)
if expect_err_str is None: if expect_err_str is None:
assert 'Error' not in str(child.before) assert 'Error' not in str(child.before)
@ -365,7 +221,7 @@ def test_root_actor_bp_forever(
# quit out of the loop # quit out of the loop
child.sendline('q') child.sendline('q')
child.expect(pexpect.EOF) child.expect(EOF)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -430,7 +286,7 @@ def test_subactor_error(
child.expect('\r\n') child.expect('\r\n')
# process should exit # process should exit
child.expect(pexpect.EOF) child.expect(EOF)
def test_subactor_breakpoint( def test_subactor_breakpoint(
@ -493,7 +349,7 @@ def test_subactor_breakpoint(
child.sendline('c') child.sendline('c')
# process should exit # process should exit
child.expect(pexpect.EOF) child.expect(EOF)
before = str(child.before.decode()) before = str(child.before.decode())
assert in_prompt_msg( assert in_prompt_msg(
@ -636,7 +492,7 @@ def test_multi_subactors(
# process should exit # process should exit
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
# repeat of previous multierror for final output # repeat of previous multierror for final output
assert_before(child, [ assert_before(child, [
@ -776,7 +632,7 @@ def test_multi_daemon_subactors(
) )
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
@has_nested_actors @has_nested_actors
@ -852,7 +708,7 @@ def test_multi_subactors_root_errors(
]) ])
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
assert_before(child, [ assert_before(child, [
# "Attaching to pdb in crashed actor: ('root'", # "Attaching to pdb in crashed actor: ('root'",
@ -982,7 +838,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
for i in range(3): for i in range(3):
try: try:
child.expect(pexpect.EOF, timeout=0.5) child.expect(EOF, timeout=0.5)
break break
except TIMEOUT: except TIMEOUT:
child.sendline('c') child.sendline('c')
@ -1024,7 +880,7 @@ def test_root_cancels_child_context_during_startup(
do_ctlc(child) do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
def test_different_debug_mode_per_actor( def test_different_debug_mode_per_actor(
@ -1045,7 +901,7 @@ def test_different_debug_mode_per_actor(
do_ctlc(child) do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
before = str(child.before.decode()) before = str(child.before.decode())
@ -1196,7 +1052,7 @@ def test_pause_from_sync(
) )
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
def test_post_mortem_api( def test_post_mortem_api(
@ -1301,7 +1157,7 @@ def test_post_mortem_api(
# ) # )
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
def test_shield_pause( def test_shield_pause(
@ -1376,7 +1232,7 @@ def test_shield_pause(
] ]
) )
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
# TODO: better error for "non-ideal" usage from the root actor. # TODO: better error for "non-ideal" usage from the root actor.

View File

@ -0,0 +1,120 @@
'''
That "native" runtime-hackin toolset better be dang useful!
Verify the funtion of a variety of "developer-experience" tools we
offer from the `.devx` sub-pkg:
- use of the lovely `stackscope` for dumping actor `trio`-task trees
during operation and hangs.
TODO:
- demonstration of `CallerInfo` call stack frame filtering such that
for logging and REPL purposes a user sees exactly the layers needed
when debugging a problem inside the stack vs. in their app.
'''
import os
import signal
from .conftest import (
expect,
assert_before,
# in_prompt_msg,
)
def test_shield_pause(
spawn,
):
'''
Verify the `tractor.pause()/.post_mortem()` API works inside an
already cancelled `trio.CancelScope` and that you can step to the
next checkpoint wherein the cancelled will get raised.
'''
child = spawn(
'shield_hang_in_sub'
)
expect(
child,
'Yo my child hanging..?',
)
assert_before(
child,
[
'Entering shield sleep..',
'Enabling trace-trees on `SIGUSR1` since `stackscope` is installed @',
]
)
print(
'Sending SIGUSR1 to see a tree-trace!',
)
os.kill(
child.pid,
signal.SIGUSR1,
)
expect(
child,
# end-of-tree delimiter
"------ \('root', ",
)
assert_before(
child,
[
'Trying to dump `stackscope` tree..',
'Dumping `stackscope` tree for actor',
"('root'", # uid line
# parent block point (non-shielded)
'await trio.sleep_forever() # in root',
]
)
# expect(
# child,
# # relay to the sub should be reported
# 'Relaying `SIGUSR1`[10] to sub-actor',
# )
expect(
child,
# end-of-tree delimiter
"------ \('hanger', ",
)
assert_before(
child,
[
# relay to the sub should be reported
'Relaying `SIGUSR1`[10] to sub-actor',
"('hanger'", # uid line
# hanger LOC where it's shield-halted
'await trio.sleep_forever() # in subactor',
]
)
# breakpoint()
# simulate the user sending a ctl-c to the hanging program.
# this should result in the terminator kicking in since
# the sub is shield blocking and can't respond to SIGINT.
os.kill(
child.pid,
signal.SIGINT,
)
expect(
child,
'Shutting down actor runtime',
timeout=6,
)
assert_before(
child,
[
'raise KeyboardInterrupt',
# 'Shutting down actor runtime',
'#T-800 deployed to collect zombie B0',
"'--uid', \"('hanger',",
]
)

View File

@ -21,9 +21,11 @@ import trio
import tractor import tractor
from tractor import ( from tractor import (
current_actor, current_actor,
Actor,
to_asyncio, to_asyncio,
RemoteActorError, RemoteActorError,
ContextCancelled, ContextCancelled,
_state,
) )
from tractor.trionics import BroadcastReceiver from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc from tractor._testing import expect_ctxc
@ -80,7 +82,16 @@ async def asyncio_actor(
) -> None: ) -> None:
assert tractor.current_actor().is_infected_aio() # ensure internal runtime state is consistent
actor: Actor = tractor.current_actor()
assert (
actor.is_infected_aio()
and
actor._infected_aio
and
_state._runtime_vars['_is_infected_aio']
)
target: Callable = globals()[target] target: Callable = globals()[target]
if '.' in expect_err: if '.' in expect_err:
@ -136,7 +147,7 @@ def test_aio_simple_error(reg_addr):
assert err assert err
assert isinstance(err, RemoteActorError) assert isinstance(err, RemoteActorError)
assert err.boxed_type == AssertionError assert err.boxed_type is AssertionError
def test_tractor_cancels_aio(reg_addr): def test_tractor_cancels_aio(reg_addr):

View File

@ -20,6 +20,7 @@ Sub-process entry points.
""" """
from __future__ import annotations from __future__ import annotations
from functools import partial from functools import partial
import multiprocessing as mp
import os import os
import textwrap import textwrap
from typing import ( from typing import (
@ -64,20 +65,22 @@ def _mp_main(
''' '''
actor._forkserver_info = forkserver_info actor._forkserver_info = forkserver_info
from ._spawn import try_set_start_method from ._spawn import try_set_start_method
spawn_ctx = try_set_start_method(start_method) spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method)
assert spawn_ctx
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f'Setting loglevel for {actor.uid} to {actor.loglevel}'
)
get_console_log(actor.loglevel) get_console_log(actor.loglevel)
assert spawn_ctx # TODO: use scops headers like for `trio` below!
# (well after we libify it maybe..)
log.info( log.info(
f"Started new {spawn_ctx.current_process()} for {actor.uid}") f'Started new {spawn_ctx.current_process()} for {actor.uid}'
# f"parent_addr is {parent_addr}"
_state._current_actor = actor )
_state._current_actor: Actor = actor
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial( trio_main = partial(
async_main, async_main,
actor=actor, actor=actor,
@ -94,7 +97,9 @@ def _mp_main(
pass # handle it the same way trio does? pass # handle it the same way trio does?
finally: finally:
log.info(f"Subactor {actor.uid} terminated") log.info(
f'`mp`-subactor {actor.uid} exited'
)
# TODO: move this func to some kinda `.devx._conc_lang.py` eventually # TODO: move this func to some kinda `.devx._conc_lang.py` eventually

View File

@ -80,7 +80,7 @@ async def open_root_actor(
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support maybe_enable_greenback: bool = True, # `.pause_from_sync()/breakpoint()` support
enable_stack_on_sig: bool = False, enable_stack_on_sig: bool = False,
# internal logging # internal logging
@ -233,14 +233,8 @@ async def open_root_actor(
and and
enable_stack_on_sig enable_stack_on_sig
): ):
try: from .devx._stackscope import enable_stack_on_sig
logger.info('Enabling `stackscope` traces on SIGUSR1') enable_stack_on_sig()
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
logger.warning(
'`stackscope` not installed for use in debug mode!'
)
# closed into below ping task-func # closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = [] ponged_addrs: list[tuple[str, int]] = []

View File

@ -59,6 +59,7 @@ import os
import warnings import warnings
import trio import trio
from trio._core import _run as trio_runtime
from trio import ( from trio import (
CancelScope, CancelScope,
Nursery, Nursery,
@ -80,6 +81,7 @@ from ._context import (
from .log import get_logger from .log import get_logger
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
InternalError,
ModuleNotExposed, ModuleNotExposed,
MsgTypeError, MsgTypeError,
unpack_error, unpack_error,
@ -102,6 +104,7 @@ from ._rpc import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ._supervise import ActorNursery from ._supervise import ActorNursery
from trio._channel import MemoryChannelState
log = get_logger('tractor') log = get_logger('tractor')
@ -897,11 +900,15 @@ class Actor:
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'cid:{cid}\n' f'cid:{cid}\n'
) )
ctx._allow_overruns = allow_overruns ctx._allow_overruns: bool = allow_overruns
# adjust buffer size if specified # adjust buffer size if specified
state = ctx._send_chan._state # type: ignore state: MemoryChannelState = ctx._send_chan._state # type: ignore
if msg_buffer_size and state.max_buffer_size != msg_buffer_size: if (
msg_buffer_size
and
state.max_buffer_size != msg_buffer_size
):
state.max_buffer_size = msg_buffer_size state.max_buffer_size = msg_buffer_size
except KeyError: except KeyError:
@ -1095,7 +1102,36 @@ class Actor:
'`tractor.pause_from_sync()` not available!' '`tractor.pause_from_sync()` not available!'
) )
rvs['_is_root'] = False # XXX ensure the "infected `asyncio` mode" setting
# passed down from our spawning parent is consistent
# with `trio`-runtime initialization:
# - during sub-proc boot, the entrypoint func
# (`._entry.<spawn_backend>_main()`) should set
# `._infected_aio = True` before calling
# `run_as_asyncio_guest()`,
# - the value of `infect_asyncio: bool = True` as
# passed to `ActorNursery.start_actor()` must be
# the same as `_runtime_vars['_is_infected_aio']`
if (
(aio_rtv := rvs['_is_infected_aio'])
!=
(aio_attr := self._infected_aio)
):
raise InternalError(
'Parent sent runtime-vars that mismatch for the '
'"infected `asyncio` mode" settings ?!?\n\n'
f'rvs["_is_infected_aio"] = {aio_rtv}\n'
f'self._infected_aio = {aio_attr}\n'
)
if aio_rtv:
assert trio_runtime.GLOBAL_RUN_CONTEXT.runner.is_guest
# ^TODO^ possibly add a `sniffio` or
# `trio` pub-API for `is_guest_mode()`?
rvs['_is_root'] = False # obvi XD
# update process-wide globals
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
# XXX: ``msgspec`` doesn't support serializing tuples # XXX: ``msgspec`` doesn't support serializing tuples

View File

@ -44,6 +44,8 @@ _runtime_vars: dict[str, Any] = {
'_root_mailbox': (None, None), '_root_mailbox': (None, None),
'_registry_addrs': [], '_registry_addrs': [],
'_is_infected_aio': False,
# for `tractor.pause_from_sync()` & `breakpoint()` support # for `tractor.pause_from_sync()` & `breakpoint()` support
'use_greenback': False, 'use_greenback': False,
} }
@ -70,7 +72,8 @@ def current_actor(
''' '''
if ( if (
err_on_no_runtime err_on_no_runtime
and _current_actor is None and
_current_actor is None
): ):
msg: str = 'No local actor has been initialized yet?\n' msg: str = 'No local actor has been initialized yet?\n'
from ._exceptions import NoRuntime from ._exceptions import NoRuntime

View File

@ -158,6 +158,7 @@ class ActorNursery:
# configure and pass runtime state # configure and pass runtime state
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False _rtv['_is_root'] = False
_rtv['_is_infected_aio'] = infect_asyncio
# allow setting debug policy per actor # allow setting debug policy per actor
if debug_mode is not None: if debug_mode is not None:

View File

@ -24,13 +24,24 @@ disjoint, parallel executing tasks in separate actors.
''' '''
from __future__ import annotations from __future__ import annotations
# from functools import partial
from threading import (
current_thread,
Thread,
RLock,
)
import multiprocessing as mp import multiprocessing as mp
from signal import ( from signal import (
signal, signal,
getsignal,
SIGUSR1, SIGUSR1,
) )
import traceback # import traceback
from typing import TYPE_CHECKING from types import ModuleType
from typing import (
Callable,
TYPE_CHECKING,
)
import trio import trio
from tractor import ( from tractor import (
@ -51,26 +62,45 @@ if TYPE_CHECKING:
@trio.lowlevel.disable_ki_protection @trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None: def dump_task_tree() -> None:
import stackscope '''
from tractor.log import get_console_log Do a classic `stackscope.extract()` task-tree dump to console at
`.devx()` level.
'''
import stackscope
tree_str: str = str( tree_str: str = str(
stackscope.extract( stackscope.extract(
trio.lowlevel.current_root_task(), trio.lowlevel.current_root_task(),
recurse_child_tasks=True recurse_child_tasks=True
) )
) )
log = get_console_log(
name=__name__,
level='cancel',
)
actor: Actor = _state.current_actor() actor: Actor = _state.current_actor()
thr: Thread = current_thread()
log.devx( log.devx(
f'Dumping `stackscope` tree for actor\n' f'Dumping `stackscope` tree for actor\n'
f'{actor.name}: {actor}\n' f'{actor.uid}:\n'
f' |_{mp.current_process()}\n\n' f'|_{mp.current_process()}\n'
f' |_{thr}\n'
f' |_{actor}\n\n'
# start-of-trace-tree delimiter (mostly for testing)
'------ - ------\n'
'\n'
+
f'{tree_str}\n' f'{tree_str}\n'
+
# end-of-trace-tree delimiter (mostly for testing)
f'\n'
f'------ {actor.uid!r} ------\n'
) )
# TODO: can remove this right?
# -[ ] was original code from author
#
# print(
# 'DUMPING FROM PRINT\n'
# +
# content
# )
# import logging # import logging
# try: # try:
# with open("/dev/tty", "w") as tty: # with open("/dev/tty", "w") as tty:
@ -80,58 +110,130 @@ def dump_task_tree() -> None:
# "task_tree" # "task_tree"
# ).exception("Error printing task tree") # ).exception("Error printing task tree")
_handler_lock = RLock()
_tree_dumped: bool = False
def signal_handler(
def dump_tree_on_sig(
sig: int, sig: int,
frame: object, frame: object,
relay_to_subs: bool = True, relay_to_subs: bool = True,
) -> None: ) -> None:
try: global _tree_dumped, _handler_lock
trio.lowlevel.current_trio_token( with _handler_lock:
).run_sync_soon(dump_task_tree) if _tree_dumped:
except RuntimeError: log.warning(
# not in async context -- print a normal traceback 'Already dumped for this actor...??'
traceback.print_stack() )
return
_tree_dumped = True
# actor: Actor = _state.current_actor()
log.devx(
'Trying to dump `stackscope` tree..\n'
)
try:
dump_task_tree()
# await actor._service_n.start_soon(
# partial(
# trio.to_thread.run_sync,
# dump_task_tree,
# )
# )
# trio.lowlevel.current_trio_token().run_sync_soon(
# dump_task_tree
# )
except RuntimeError:
log.exception(
'Failed to dump `stackscope` tree..\n'
)
# not in async context -- print a normal traceback
# traceback.print_stack()
raise
except BaseException:
log.exception(
'Failed to dump `stackscope` tree..\n'
)
raise
log.devx(
'Supposedly we dumped just fine..?'
)
if not relay_to_subs: if not relay_to_subs:
return return
an: ActorNursery an: ActorNursery
for an in _state.current_actor()._actoruid2nursery.values(): for an in _state.current_actor()._actoruid2nursery.values():
subproc: ProcessType subproc: ProcessType
subactor: Actor subactor: Actor
for subactor, subproc, _ in an._children.values(): for subactor, subproc, _ in an._children.values():
log.devx( log.warning(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n' f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n' f'{subactor}\n'
f' |_{subproc}\n' f' |_{subproc}\n'
) )
if isinstance(subproc, trio.Process): # bc of course stdlib can't have a std API.. XD
subproc.send_signal(sig) match subproc:
case trio.Process():
subproc.send_signal(sig)
elif isinstance(subproc, mp.Process): case mp.Process():
subproc._send_signal(sig) subproc._send_signal(sig)
def enable_stack_on_sig( def enable_stack_on_sig(
sig: int = SIGUSR1 sig: int = SIGUSR1,
) -> None: ) -> ModuleType:
''' '''
Enable `stackscope` tracing on reception of a signal; by Enable `stackscope` tracing on reception of a signal; by
default this is SIGUSR1. default this is SIGUSR1.
HOT TIP: a task/ctx-tree dump can be triggered from a shell with
fancy cmds.
For ex. from `bash` using `pgrep` and cmd-sustitution
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
you could use:
>> kill -SIGUSR1 $(pgrep -f '<cmd>')
Or with with `xonsh` (which has diff capture-from-subproc syntax)
>> kill -SIGUSR1 @$(pgrep -f '<cmd>')
''' '''
try:
import stackscope
except ImportError:
log.warning(
'`stackscope` not installed for use in debug mode!'
)
return None
handler: Callable|int = getsignal(sig)
if handler is dump_tree_on_sig:
log.devx(
'A `SIGUSR1` handler already exists?\n'
f'|_ {handler!r}\n'
)
return
signal( signal(
sig, sig,
signal_handler, dump_tree_on_sig,
) )
# NOTE: not the above can be triggered from log.devx(
# a (xonsh) shell using: 'Enabling trace-trees on `SIGUSR1` '
# kill -SIGUSR1 @$(pgrep -f '<cmd>') 'since `stackscope` is installed @ \n'
# f'{stackscope!r}\n\n'
# for example if you were looking to trace a `pytest` run f'With `SIGUSR1` handler\n'
# kill -SIGUSR1 @$(pgrep -f 'pytest') f'|_{dump_tree_on_sig}\n'
)
return stackscope

View File

@ -276,7 +276,10 @@ def _run_asyncio_task(
chan._aio_task: asyncio.Task = task chan._aio_task: asyncio.Task = task
# XXX TODO XXX get this actually workin.. XD # XXX TODO XXX get this actually workin.. XD
# maybe setup `greenback` for `asyncio`-side task REPLing # -[ ] we need logic to setup `greenback` for `asyncio`-side task
# REPLing.. which should normally be nearly the same as for
# `trio`?
# -[ ] add to a new `.devx._greenback.maybe_init_for_asyncio()`?
if ( if (
debug_mode() debug_mode()
and and
@ -305,15 +308,22 @@ def _run_asyncio_task(
msg: str = ( msg: str = (
'Infected `asyncio` task {etype_str}\n' 'Infected `asyncio` task {etype_str}\n'
f'|_{task}\n'
) )
if isinstance(terr, CancelledError): if isinstance(terr, CancelledError):
msg += (
f'c)>\n'
f' |_{task}\n'
)
log.cancel( log.cancel(
msg.format(etype_str='cancelled') msg.format(etype_str='cancelled')
) )
else: else:
msg += (
f'x)>\n'
f' |_{task}\n'
)
log.exception( log.exception(
msg.format(etype_str='cancelled') msg.format(etype_str='errored')
) )
assert type(terr) is type(aio_err), ( assert type(terr) is type(aio_err), (
@ -619,9 +629,10 @@ def run_as_asyncio_guest(
# ) # )
def trio_done_callback(main_outcome): def trio_done_callback(main_outcome):
log.info( log.runtime(
f'trio_main finished with\n' f'`trio` guest-run finishing with outcome\n'
f'|_{main_outcome!r}' f'>) {main_outcome}\n'
f'|_{trio_done_fute}\n'
) )
if isinstance(main_outcome, Error): if isinstance(main_outcome, Error):
@ -643,6 +654,12 @@ def run_as_asyncio_guest(
else: else:
trio_done_fute.set_result(main_outcome) trio_done_fute.set_result(main_outcome)
log.info(
f'`trio` guest-run finished with outcome\n'
f')>\n'
f'|_{trio_done_fute}\n'
)
startup_msg += ( startup_msg += (
f'-> created {trio_done_callback!r}\n' f'-> created {trio_done_callback!r}\n'
f'-> scheduling `trio_main`: {trio_main!r}\n' f'-> scheduling `trio_main`: {trio_main!r}\n'
@ -681,7 +698,8 @@ def run_as_asyncio_guest(
# error path in `asyncio`'s runtime..? # error path in `asyncio`'s runtime..?
asyncio.CancelledError, asyncio.CancelledError,
) as fute_err: ) as _fute_err:
fute_err = _fute_err
err_message: str = ( err_message: str = (
'main `asyncio` task ' 'main `asyncio` task '
) )