forked from goodboy/tractor
1
0
Fork 0

Get multi-threaded sync-pausing fully workin!

The final issue was making sure we do the same thing on ctl-c/SIGINT
from the user. That is, if there's already a bg-thread in REPL, we
`log.pdb()` about SIGINT shielding and re-draw the prompt; the same UX
as normal actor-runtime-task behaviour.

Reasons this wasn't workin.. and the fix:
- `.pause_from_sync()` was overriding the local `repl` var with `None`
  delivered by (transitive) calls to `_pause(debug_func=None)`.. so
  remove all that and only assign it OAOO prior to thread-type case
  branching.
- always call `DebugStatus.shield_sigint()` as needed from all requesting
  threads/tasks:
  - in `_pause_from_bg_root_thread()` BEFORE calling `._pause()` AND BEFORE
    yielding back to the bg-thread via `.started(out)` to ensure we're
    definitely overriding the handler in the `trio`-main-thread task
    before unblocking the requesting bg-thread.
  - from any requesting bg-thread in the root actor such that both its
    main-`trio`-thread scheduled task (as per above bullet) AND it are
    SIGINT shielded.
  - always call `.shield_sigint()` BEFORE any `greenback._await()` case
    don't entirely grok why yet, but it works)?
  - for `greenback._await()` case always set `bg_task` to the current one..
- tweaks to the `SIGINT` handler, now renamed `sigint_shield()` so as
  not to name-collide with the methods when editor-searching:
  - always try to `repr()` the REPL thread/task "owner" as well as the
    active `PdbREPL` instance.
  - add `.devx()` notes around the prompt flushing deats and comments
    for any root-actor-bg-thread edge cases.

Related/supporting refinements:
- add `get_lock()`/`get_debug_req()` factory funcs since the plan is to
  eventually implement both as `@singleton` instances per actor.
- fix `acquire_debug_lock()`'s call-sig-bug for scheduling
  `request_root_stdio_lock()`..
- in `._pause()` only call `mk_pdb()` when `debug_func != None`.
- add some todo/warning notes around the `cls.repl = None` in
  `DebugStatus.release()`

`test_pause_from_sync()` tweaks:
- don't use a `attach_patts.copy()`, since we always `break` on match.
- do `pytest.fail()` on that ^ loop's fallthrough..
- pass `do_ctlc(child, patt=attach_key)` such that we always match the
  the current thread's name with the ctl-c triggered `.pdb()` emission.
- oh yeah, return the last `before: str` from `do_ctlc()`.
- in the script, flip `abandon_on_cancel=True` since when `False` it
  seems to cause `trio.run()` to hang on exit from the last bg-thread
  case?!?
aio_abandons
Tyler Goodlet 2024-07-08 20:57:41 -04:00
parent bef3dd9e97
commit fc95c6719f
4 changed files with 211 additions and 56 deletions

View File

@ -4,6 +4,13 @@ import time
import trio
import tractor
# TODO: only import these when not running from test harness?
# can we detect `pexpect` usage maybe?
# from tractor.devx._debug import (
# get_lock,
# get_debug_req,
# )
def sync_pause(
use_builtin: bool = False,
@ -18,7 +25,13 @@ def sync_pause(
breakpoint(hide_tb=hide_tb)
else:
# TODO: maybe for testing some kind of cm style interface
# where the `._set_trace()` call doesn't happen until block
# exit?
# assert get_lock().ctx_in_debug is None
# assert get_debug_req().repl is None
tractor.pause_from_sync()
# assert get_debug_req().repl is None
if error:
raise RuntimeError('yoyo sync code error')
@ -41,10 +54,11 @@ async def start_n_sync_pause(
async def main() -> None:
async with (
tractor.open_nursery(
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
debug_mode=True,
# loglevel='cancel',
maybe_enable_greenback=True,
enable_stack_on_sig=True,
# loglevel='warning',
# loglevel='devx',
) as an,
trio.open_nursery() as tn,
):
@ -138,7 +152,9 @@ async def main() -> None:
# the case 2. from above still exists!
use_builtin=True,
),
abandon_on_cancel=False,
# TODO: with this `False` we can hang!??!
# abandon_on_cancel=False,
abandon_on_cancel=True,
thread_name='inline_root_bg_thread',
)

View File

@ -299,7 +299,9 @@ def do_ctlc(
# needs some further investigation potentially...
expect_prompt: bool = not _ci_env,
) -> None:
) -> str|None:
before: str|None = None
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
@ -309,15 +311,18 @@ def do_ctlc(
# TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine..
if expect_prompt:
before = str(child.before.decode())
time.sleep(delay)
child.expect(PROMPT)
before = str(child.before.decode())
time.sleep(delay)
if patt:
# should see the last line on console
assert patt in before
# return the console content up to the final prompt
return before
def test_root_actor_bp_forever(
spawn,
@ -1085,10 +1090,10 @@ def test_pause_from_sync(
)
if ctlc:
do_ctlc(child)
# ^NOTE^ subactor not spawned yet; don't need extra delay.
child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT)
@ -1109,7 +1114,27 @@ def test_pause_from_sync(
)
if ctlc:
do_ctlc(child)
do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.3,
)
# XXX, fwiw without a brief sleep here the SIGINT might actually
# trigger "subactor" cancellation by its parent before the
# shield-handler is engaged.
#
# => similar to the `delay` input to `do_ctlc()` below, setting
# this too low can cause the test to fail since the `subactor`
# suffers a race where the root/parent sends an actor-cancel
# prior to the context task hitting its pause point (and thus
# engaging the `sigint_shield()` handler in time); this value
# seems be good enuf?
time.sleep(0.6)
# one of the bg thread or subactor should have
# `Lock.acquire()`-ed
@ -1128,29 +1153,45 @@ def test_pause_from_sync(
"('root'",
],
}
conts: int = 0 # for debugging below matching logic on failure
while attach_patts:
child.sendline('c')
conts += 1
child.expect(PROMPT)
before = str(child.before.decode())
for key in attach_patts.copy():
for key in attach_patts:
if key in before:
attach_key: str = key
expected_patts: str = attach_patts.pop(key)
assert_before(
child,
[_pause_msg] + expected_patts
[_pause_msg]
+
expected_patts
)
break
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.items():
for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg(
before,
other_patts,
)
if ctlc:
do_ctlc(child)
do_ctlc(
child,
patt=attach_key,
# NOTE same as comment above
delay=0.3,
)
child.sendline('c')
child.expect(pexpect.EOF)

View File

@ -26,7 +26,7 @@ from ._debug import (
breakpoint as breakpoint,
pause as pause,
pause_from_sync as pause_from_sync,
shield_sigint_handler as shield_sigint_handler,
sigint_shield as sigint_shield,
open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler,
maybe_init_greenback as maybe_init_greenback,

View File

@ -409,9 +409,9 @@ class Lock:
repl_task
)
message += (
f'\nA non-caller task still owns this lock on behalf of '
f'{behalf_of_task}\n'
f'|_{lock_stats.owner}\n'
f'A non-caller task still owns this lock on behalf of '
f'`{behalf_of_task}`\n'
f'lock owner task: {lock_stats.owner}\n'
)
if (
@ -523,6 +523,10 @@ class Lock:
)
def get_lock() -> Lock:
return Lock
@tractor.context(
# enable the locking msgspec
pld_spec=__pld_spec__,
@ -788,13 +792,13 @@ class DebugStatus:
cls._orig_sigint_handler: Callable = trio.from_thread.run_sync(
signal.signal,
signal.SIGINT,
shield_sigint_handler,
sigint_shield,
)
else:
cls._orig_sigint_handler = signal.signal(
signal.SIGINT,
shield_sigint_handler,
sigint_shield,
)
@classmethod
@ -900,12 +904,30 @@ class DebugStatus:
# actor-local state, irrelevant for non-root.
cls.repl_task = None
# XXX WARNING needs very special caughtion, and we should
# prolly make a more explicit `@property` API?
#
# - if unset in root multi-threaded case can cause
# issues with detecting that some root thread is
# using a REPL,
#
# - what benefit is there to unsetting, it's always
# set again for the next task in some actor..
# only thing would be to avoid in the sigint-handler
# logging when we don't need to?
cls.repl = None
# restore original sigint handler
cls.unshield_sigint()
# TODO: use the new `@lowlevel.singleton` for this!
def get_debug_req() -> DebugStatus|None:
return DebugStatus
class TractorConfig(pdbp.DefaultConfig):
'''
Custom `pdbp` config which tries to use the best tradeoff
@ -1311,7 +1333,7 @@ def any_connected_locker_child() -> bool:
return False
def shield_sigint_handler(
def sigint_shield(
signum: int,
frame: 'frame', # type: ignore # noqa
*args,
@ -1351,13 +1373,17 @@ def shield_sigint_handler(
# root actor branch that reports whether or not a child
# has locked debugger.
if is_root_process():
# log.warning(
log.devx(
'Handling SIGINT in root actor\n'
f'{Lock.repr()}'
f'{DebugStatus.repr()}\n'
)
# try to see if the supposed (sub)actor in debug still
# has an active connection to *this* actor, and if not
# it's likely they aren't using the TTY lock / debugger
# and we should propagate SIGINT normally.
any_connected: bool = any_connected_locker_child()
# if not any_connected:
# return do_cancel()
problem = (
f'root {actor.uid} handling SIGINT\n'
@ -1406,19 +1432,25 @@ def shield_sigint_handler(
# an actor using the `Lock` (a bug state) ??
# => so immediately cancel any stale lock cs and revert
# the handler!
if not repl:
if not DebugStatus.repl:
# TODO: WHEN should we revert back to ``trio``
# handler if this one is stale?
# -[ ] maybe after a counts work of ctl-c mashes?
# -[ ] use a state var like `stale_handler: bool`?
problem += (
'\n'
'No subactor is using a `pdb` REPL according `Lock.ctx_in_debug`?\n'
'BUT, the root should be using it, WHY this handler ??\n'
'BUT, the root should be using it, WHY this handler ??\n\n'
'So either..\n'
'- some root-thread is using it but has no `.repl` set?, OR\n'
'- something else weird is going on outside the runtime!?\n'
)
else:
# NOTE: since we emit this msg on ctl-c, we should
# also always re-print the prompt the tail block!
log.pdb(
'Ignoring SIGINT while pdb REPL in use by root actor..\n'
f'{DebugStatus.repl_task}\n'
f' |_{repl}\n'
)
problem = None
@ -1468,7 +1500,6 @@ def shield_sigint_handler(
'Allowing SIGINT propagation..'
)
DebugStatus.unshield_sigint()
# do_cancel()
repl_task: str|None = DebugStatus.repl_task
req_task: str|None = DebugStatus.req_task
@ -1483,10 +1514,15 @@ def shield_sigint_handler(
f' |_{repl}\n'
)
elif req_task:
log.pdb(
f'Ignoring SIGINT while debug request task is open\n'
log.debug(
'Ignoring SIGINT while debug request task is open but either,\n'
'- someone else is already REPL-in and has the `Lock`, or\n'
'- some other local task already is replin?\n'
f'|_{req_task}\n'
)
# TODO can we remove this now?
# -[ ] does this path ever get hit any more?
else:
msg: str = (
'SIGINT shield handler still active BUT, \n\n'
@ -1522,31 +1558,47 @@ def shield_sigint_handler(
# https://github.com/goodboy/tractor/issues/320
# elif debug_mode():
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
# it looks to be that the last command that was run (eg. ll)
# will be repeated by default.
# maybe redraw/print last REPL output to console since
# we want to alert the user that more input is expect since
# nothing has been done dur to ignoring sigint.
if (
repl # only when current actor has a REPL engaged
DebugStatus.repl # only when current actor has a REPL engaged
):
flush_status: str = (
'Flushing stdout to ensure new prompt line!\n'
)
# XXX: yah, mega hack, but how else do we catch this madness XD
if repl.shname == 'xonsh':
if (
repl.shname == 'xonsh'
):
flush_status += (
'-> ALSO re-flushing due to `xonsh`..\n'
)
repl.stdout.write(repl.prompt)
# log.warning(
log.devx(
flush_status
)
repl.stdout.flush()
# TODO: make this work like sticky mode where if there is output
# TODO: better console UX to match the current "mode":
# -[ ] for example if in sticky mode where if there is output
# detected as written to the tty we redraw this part underneath
# and erase the past draw of this same bit above?
# repl.sticky = True
# repl._print_if_sticky()
# also see these links for an approach from ``ptk``:
# also see these links for an approach from `ptk`:
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
else:
log.devx(
# log.warning(
'Not flushing stdout since not needed?\n'
f'|_{repl}\n'
)
# XXX only for tracing this handler
log.devx('exiting SIGINT')
@ -1617,7 +1669,7 @@ async def _pause(
# 'directly (infected) `asyncio` tasks!'
# ) from rte
raise
raise rte
if debug_func is not None:
debug_func = partial(debug_func)
@ -1625,9 +1677,13 @@ async def _pause(
# XXX NOTE XXX set it here to avoid ctl-c from cancelling a debug
# request from a subactor BEFORE the REPL is entered by that
# process.
if not repl:
if (
not repl
and
debug_func
):
repl: PdbREPL = mk_pdb()
DebugStatus.shield_sigint()
repl: PdbREPL = repl or mk_pdb()
# TODO: move this into a `open_debug_request()` @acm?
# -[ ] prolly makes the most sense to do the request
@ -1662,7 +1718,13 @@ async def _pause(
# recurrent entries/requests from the same
# actor-local task.
DebugStatus.repl_task = task
if repl:
DebugStatus.repl = repl
else:
log.error(
'No REPl instance set before entering `debug_func`?\n'
f'{debug_func}\n'
)
# invoke the low-level REPL activation routine which itself
# should call into a `Pdb.set_trace()` of some sort.
@ -2001,7 +2063,7 @@ async def _pause(
DebugStatus.release(cancel_req_task=True)
# sanity checks for ^ on request/status teardown
assert DebugStatus.repl is None
# assert DebugStatus.repl is None # XXX no more bc bg thread cases?
assert DebugStatus.repl_task is None
# sanity, for when hackin on all this?
@ -2240,7 +2302,12 @@ async def _pause_from_bg_root_thread(
'Trying to acquire `Lock` on behalf of bg thread\n'
f'|_{behalf_of_thread}\n'
)
# DebugStatus.repl_task = behalf_of_thread
# NOTE: this is already a task inside the main-`trio`-thread, so
# we don't need to worry about calling it another time from the
# bg thread on which who's behalf this task is operating.
DebugStatus.shield_sigint()
out = await _pause(
debug_func=None,
repl=repl,
@ -2249,6 +2316,8 @@ async def _pause_from_bg_root_thread(
called_from_bg_thread=True,
**_pause_kwargs
)
DebugStatus.repl_task = behalf_of_thread
lock: trio.FIFOLock = Lock._debug_lock
stats: trio.LockStatistics= lock.statistics()
assert stats.owner is task
@ -2282,7 +2351,6 @@ async def _pause_from_bg_root_thread(
f'|_{behalf_of_thread}\n'
)
task_status.started(out)
DebugStatus.shield_sigint()
# wait for bg thread to exit REPL sesh.
try:
@ -2323,7 +2391,7 @@ def pause_from_sync(
err_on_no_runtime=False,
)
message: str = (
f'{actor.uid} task called `tractor.pause_from_sync()`\n\n'
f'{actor.uid} task called `tractor.pause_from_sync()`\n'
)
if not actor:
raise RuntimeError(
@ -2347,7 +2415,6 @@ def pause_from_sync(
'for infected `asyncio` mode!'
)
DebugStatus.shield_sigint()
repl: PdbREPL = mk_pdb()
# message += f'-> created local REPL {repl}\n'
@ -2365,6 +2432,10 @@ def pause_from_sync(
# thread which will call `._pause()` manually with special
# handling for root-actor caller usage.
if not DebugStatus.is_main_trio_thread():
# TODO: `threading.Lock()` this so we don't get races in
# multi-thr cases where they're acquiring/releasing the
# REPL and setting request/`Lock` state, etc..
thread: threading.Thread = threading.current_thread()
repl_owner = thread
@ -2372,9 +2443,16 @@ def pause_from_sync(
if is_root:
message += (
f'-> called from a root-actor bg {thread}\n'
f'-> scheduling `._pause_from_sync_thread()`..\n'
f'-> scheduling `._pause_from_bg_root_thread()`..\n'
)
bg_task, repl = trio.from_thread.run(
# XXX SUBTLE BADNESS XXX that should really change!
# don't over-write the `repl` here since when
# this behalf-of-bg_thread-task calls pause it will
# pass `debug_func=None` which will result in it
# returing a `repl==None` output and that get's also
# `.started(out)` back here! So instead just ignore
# that output and assign the `repl` created above!
bg_task, _ = trio.from_thread.run(
afn=partial(
actor._service_n.start,
partial(
@ -2386,8 +2464,9 @@ def pause_from_sync(
),
)
)
DebugStatus.shield_sigint()
message += (
f'-> `._pause_from_sync_thread()` started bg task {bg_task}\n'
f'-> `._pause_from_bg_root_thread()` started bg task {bg_task}\n'
)
else:
message += f'-> called from a bg {thread}\n'
@ -2396,7 +2475,7 @@ def pause_from_sync(
# `request_root_stdio_lock()` and we don't need to
# worry about all the special considerations as with
# the root-actor per above.
bg_task, repl = trio.from_thread.run(
bg_task, _ = trio.from_thread.run(
afn=partial(
_pause,
debug_func=None,
@ -2411,6 +2490,9 @@ def pause_from_sync(
**_pause_kwargs
),
)
# ?TODO? XXX where do we NEED to call this in the
# subactor-bg-thread case?
DebugStatus.shield_sigint()
assert bg_task is not DebugStatus.repl_task
else: # we are presumably the `trio.run()` + main thread
@ -2423,6 +2505,11 @@ def pause_from_sync(
# greenback: ModuleType = await maybe_init_greenback()
message += f'-> imported {greenback}\n'
# NOTE XXX seems to need to be set BEFORE the `_pause()`
# invoke using gb below?
DebugStatus.shield_sigint()
repl_owner: Task = current_task()
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
try:
@ -2448,8 +2535,11 @@ def pause_from_sync(
raise
if out:
bg_task, repl = out
assert repl is repl
bg_task, _ = out
else:
bg_task: Task = current_task()
# assert repl is repl
assert bg_task is repl_owner
# NOTE: normally set inside `_enter_repl_sync()`
@ -2464,7 +2554,10 @@ def pause_from_sync(
)
log.devx(message)
# NOTE set as late as possible to avoid state clobbering
# in the multi-threaded case!
DebugStatus.repl = repl
_set_trace(
api_frame=api_frame or inspect.currentframe(),
repl=repl,
@ -2665,7 +2758,8 @@ async def acquire_debug_lock(
tuple,
]:
'''
Request to acquire the TTY `Lock` in the root actor, release on exit.
Request to acquire the TTY `Lock` in the root actor, release on
exit.
This helper is for actor's who don't actually need to acquired
the debugger but want to wait until the lock is free in the
@ -2677,10 +2771,14 @@ async def acquire_debug_lock(
yield None
return
task: Task = current_task()
async with trio.open_nursery() as n:
ctx: Context = await n.start(
partial(
request_root_stdio_lock,
subactor_uid,
actor_uid=subactor_uid,
task_uid=(task.name, id(task)),
)
)
yield ctx
ctx.cancel()