Compare commits

..

1 Commits

Author SHA1 Message Date
Tyler Goodlet 547b957bbf Officially test proto-ed `stackscope` integration
By re-purposing our `pexpect`-based console matching with a new
`debugging/shield_hang_in_sub.py` example, this tests a few "hanging
actor" conditions more formally:

- that despite a hanging actor's task we can dump
  a `stackscope.extract()` tree on relay of `SIGUSR1`.
- the actor tree will terminate despite a shielded forever-sleep by our
  "T-800" zombie reaper machinery activating and hard killing the
  underlying subprocess.

Some test deats:
- simulates the expect actions of a real user by manually using
  `os.kill()` to send both signals to the actor-tree program.
- `pexpect`-matches against `log.devx()` emissions under normal
  `debug_mode == True` usage.
- ensure we get the actual "T-800 deployed" `log.error()` msg and
  that the actor tree eventually terminates!

Surrounding (re-org/impl/test-suite) changes:
- allow disabling usage via a `maybe_enable_greenback: bool` to
  `open_root_actor()` but enable by def.
- pretty up the actual `.devx()` content from `.devx._stackscope`
  including be extra pedantic about the conc-primitives for each signal
  event.
- try to avoid double handles of `SIGUSR1` even though it seems the
  original (what i thought was a) problem was actually just double
  logging in the handler..
  |_ avoid double applying the handler func via `signal.signal()`,
  |_ use a global to avoid double handle func calls and,
  |_ a `threading.RLock` around handling.
- move common fixtures and helper routines from `test_debugger` to
  `tests/devx/conftest.py` and import them for use in both test mods.
2024-07-10 18:17:42 -04:00
7 changed files with 23 additions and 178 deletions

View File

@ -1,81 +0,0 @@
'''
Verify we can dump a `stackscope` tree on a hang.
'''
import os
import signal
import trio
import tractor
@tractor.context
async def start_n_shield_hang(
ctx: tractor.Context,
):
# actor: tractor.Actor = tractor.current_actor()
# sync to parent-side task
await ctx.started(os.getpid())
print('Entering shield sleep..')
with trio.CancelScope(shield=True):
await trio.sleep_forever() # in subactor
# XXX NOTE ^^^ since this shields, we expect
# the zombie reaper (aka T800) to engage on
# SIGINT from the user and eventually hard-kill
# this subprocess!
async def main(
from_test: bool = False,
) -> None:
async with (
tractor.open_nursery(
debug_mode=True,
enable_stack_on_sig=True,
# maybe_enable_greenback=False,
loglevel='devx',
) as an,
):
ptl: tractor.Portal = await an.start_actor(
'hanger',
enable_modules=[__name__],
debug_mode=True,
)
async with ptl.open_context(
start_n_shield_hang,
) as (ctx, cpid):
_, proc, _ = an._children[ptl.chan.uid]
assert cpid == proc.pid
print(
'Yo my child hanging..?\n'
'Sending SIGUSR1 to see a tree-trace!\n'
)
# XXX simulate the wrapping test's "user actions"
# (i.e. if a human didn't run this manually but wants to
# know what they should do to reproduce test behaviour)
if from_test:
os.kill(
cpid,
signal.SIGUSR1,
)
# simulate user cancelling program
await trio.sleep(0.5)
os.kill(
os.getpid(),
signal.SIGINT,
)
else:
# actually let user send the ctl-c
await trio.sleep_forever() # in root
if __name__ == '__main__':
trio.run(main)

View File

@ -21,11 +21,9 @@ import trio
import tractor import tractor
from tractor import ( from tractor import (
current_actor, current_actor,
Actor,
to_asyncio, to_asyncio,
RemoteActorError, RemoteActorError,
ContextCancelled, ContextCancelled,
_state,
) )
from tractor.trionics import BroadcastReceiver from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc from tractor._testing import expect_ctxc
@ -82,16 +80,7 @@ async def asyncio_actor(
) -> None: ) -> None:
# ensure internal runtime state is consistent assert tractor.current_actor().is_infected_aio()
actor: Actor = tractor.current_actor()
assert (
actor.is_infected_aio()
and
actor._infected_aio
and
_state._runtime_vars['_is_infected_aio']
)
target: Callable = globals()[target] target: Callable = globals()[target]
if '.' in expect_err: if '.' in expect_err:
@ -147,7 +136,7 @@ def test_aio_simple_error(reg_addr):
assert err assert err
assert isinstance(err, RemoteActorError) assert isinstance(err, RemoteActorError)
assert err.boxed_type is AssertionError assert err.boxed_type == AssertionError
def test_tractor_cancels_aio(reg_addr): def test_tractor_cancels_aio(reg_addr):

View File

@ -20,7 +20,6 @@ Sub-process entry points.
""" """
from __future__ import annotations from __future__ import annotations
from functools import partial from functools import partial
import multiprocessing as mp
import os import os
import textwrap import textwrap
from typing import ( from typing import (
@ -65,22 +64,20 @@ def _mp_main(
''' '''
actor._forkserver_info = forkserver_info actor._forkserver_info = forkserver_info
from ._spawn import try_set_start_method from ._spawn import try_set_start_method
spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method) spawn_ctx = try_set_start_method(start_method)
assert spawn_ctx
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f'Setting loglevel for {actor.uid} to {actor.loglevel}' f"Setting loglevel for {actor.uid} to {actor.loglevel}")
)
get_console_log(actor.loglevel) get_console_log(actor.loglevel)
# TODO: use scops headers like for `trio` below! assert spawn_ctx
# (well after we libify it maybe..)
log.info( log.info(
f'Started new {spawn_ctx.current_process()} for {actor.uid}' f"Started new {spawn_ctx.current_process()} for {actor.uid}")
# f"parent_addr is {parent_addr}"
) _state._current_actor = actor
_state._current_actor: Actor = actor
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial( trio_main = partial(
async_main, async_main,
actor=actor, actor=actor,
@ -97,9 +94,7 @@ def _mp_main(
pass # handle it the same way trio does? pass # handle it the same way trio does?
finally: finally:
log.info( log.info(f"Subactor {actor.uid} terminated")
f'`mp`-subactor {actor.uid} exited'
)
# TODO: move this func to some kinda `.devx._conc_lang.py` eventually # TODO: move this func to some kinda `.devx._conc_lang.py` eventually

View File

@ -59,7 +59,6 @@ import os
import warnings import warnings
import trio import trio
from trio._core import _run as trio_runtime
from trio import ( from trio import (
CancelScope, CancelScope,
Nursery, Nursery,
@ -81,7 +80,6 @@ from ._context import (
from .log import get_logger from .log import get_logger
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
InternalError,
ModuleNotExposed, ModuleNotExposed,
MsgTypeError, MsgTypeError,
unpack_error, unpack_error,
@ -104,7 +102,6 @@ from ._rpc import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ._supervise import ActorNursery from ._supervise import ActorNursery
from trio._channel import MemoryChannelState
log = get_logger('tractor') log = get_logger('tractor')
@ -900,15 +897,11 @@ class Actor:
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'cid:{cid}\n' f'cid:{cid}\n'
) )
ctx._allow_overruns: bool = allow_overruns ctx._allow_overruns = allow_overruns
# adjust buffer size if specified # adjust buffer size if specified
state: MemoryChannelState = ctx._send_chan._state # type: ignore state = ctx._send_chan._state # type: ignore
if ( if msg_buffer_size and state.max_buffer_size != msg_buffer_size:
msg_buffer_size
and
state.max_buffer_size != msg_buffer_size
):
state.max_buffer_size = msg_buffer_size state.max_buffer_size = msg_buffer_size
except KeyError: except KeyError:
@ -1102,36 +1095,7 @@ class Actor:
'`tractor.pause_from_sync()` not available!' '`tractor.pause_from_sync()` not available!'
) )
# XXX ensure the "infected `asyncio` mode" setting rvs['_is_root'] = False
# passed down from our spawning parent is consistent
# with `trio`-runtime initialization:
# - during sub-proc boot, the entrypoint func
# (`._entry.<spawn_backend>_main()`) should set
# `._infected_aio = True` before calling
# `run_as_asyncio_guest()`,
# - the value of `infect_asyncio: bool = True` as
# passed to `ActorNursery.start_actor()` must be
# the same as `_runtime_vars['_is_infected_aio']`
if (
(aio_rtv := rvs['_is_infected_aio'])
!=
(aio_attr := self._infected_aio)
):
raise InternalError(
'Parent sent runtime-vars that mismatch for the '
'"infected `asyncio` mode" settings ?!?\n\n'
f'rvs["_is_infected_aio"] = {aio_rtv}\n'
f'self._infected_aio = {aio_attr}\n'
)
if aio_rtv:
assert trio_runtime.GLOBAL_RUN_CONTEXT.runner.is_guest
# ^TODO^ possibly add a `sniffio` or
# `trio` pub-API for `is_guest_mode()`?
rvs['_is_root'] = False # obvi XD
# update process-wide globals
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
# XXX: ``msgspec`` doesn't support serializing tuples # XXX: ``msgspec`` doesn't support serializing tuples

View File

@ -44,8 +44,6 @@ _runtime_vars: dict[str, Any] = {
'_root_mailbox': (None, None), '_root_mailbox': (None, None),
'_registry_addrs': [], '_registry_addrs': [],
'_is_infected_aio': False,
# for `tractor.pause_from_sync()` & `breakpoint()` support # for `tractor.pause_from_sync()` & `breakpoint()` support
'use_greenback': False, 'use_greenback': False,
} }
@ -72,8 +70,7 @@ def current_actor(
''' '''
if ( if (
err_on_no_runtime err_on_no_runtime
and and _current_actor is None
_current_actor is None
): ):
msg: str = 'No local actor has been initialized yet?\n' msg: str = 'No local actor has been initialized yet?\n'
from ._exceptions import NoRuntime from ._exceptions import NoRuntime

View File

@ -158,7 +158,6 @@ class ActorNursery:
# configure and pass runtime state # configure and pass runtime state
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False _rtv['_is_root'] = False
_rtv['_is_infected_aio'] = infect_asyncio
# allow setting debug policy per actor # allow setting debug policy per actor
if debug_mode is not None: if debug_mode is not None:

View File

@ -276,10 +276,7 @@ def _run_asyncio_task(
chan._aio_task: asyncio.Task = task chan._aio_task: asyncio.Task = task
# XXX TODO XXX get this actually workin.. XD # XXX TODO XXX get this actually workin.. XD
# -[ ] we need logic to setup `greenback` for `asyncio`-side task # maybe setup `greenback` for `asyncio`-side task REPLing
# REPLing.. which should normally be nearly the same as for
# `trio`?
# -[ ] add to a new `.devx._greenback.maybe_init_for_asyncio()`?
if ( if (
debug_mode() debug_mode()
and and
@ -308,22 +305,15 @@ def _run_asyncio_task(
msg: str = ( msg: str = (
'Infected `asyncio` task {etype_str}\n' 'Infected `asyncio` task {etype_str}\n'
)
if isinstance(terr, CancelledError):
msg += (
f'c)>\n'
f'|_{task}\n' f'|_{task}\n'
) )
if isinstance(terr, CancelledError):
log.cancel( log.cancel(
msg.format(etype_str='cancelled') msg.format(etype_str='cancelled')
) )
else: else:
msg += (
f'x)>\n'
f' |_{task}\n'
)
log.exception( log.exception(
msg.format(etype_str='errored') msg.format(etype_str='cancelled')
) )
assert type(terr) is type(aio_err), ( assert type(terr) is type(aio_err), (
@ -629,10 +619,9 @@ def run_as_asyncio_guest(
# ) # )
def trio_done_callback(main_outcome): def trio_done_callback(main_outcome):
log.runtime( log.info(
f'`trio` guest-run finishing with outcome\n' f'trio_main finished with\n'
f'>) {main_outcome}\n' f'|_{main_outcome!r}'
f'|_{trio_done_fute}\n'
) )
if isinstance(main_outcome, Error): if isinstance(main_outcome, Error):
@ -654,12 +643,6 @@ def run_as_asyncio_guest(
else: else:
trio_done_fute.set_result(main_outcome) trio_done_fute.set_result(main_outcome)
log.info(
f'`trio` guest-run finished with outcome\n'
f')>\n'
f'|_{trio_done_fute}\n'
)
startup_msg += ( startup_msg += (
f'-> created {trio_done_callback!r}\n' f'-> created {trio_done_callback!r}\n'
f'-> scheduling `trio_main`: {trio_main!r}\n' f'-> scheduling `trio_main`: {trio_main!r}\n'
@ -698,8 +681,7 @@ def run_as_asyncio_guest(
# error path in `asyncio`'s runtime..? # error path in `asyncio`'s runtime..?
asyncio.CancelledError, asyncio.CancelledError,
) as _fute_err: ) as fute_err:
fute_err = _fute_err
err_message: str = ( err_message: str = (
'main `asyncio` task ' 'main `asyncio` task '
) )