Catch `.pause_from_sync()` in root bg thread bugs!

Originally discovered as while using `tractor.pause_from_sync()`
from the `i3ipc` client running in a bg-thread that uses `asyncio`
inside `modden`.

Turns out we definitely aren't correctly handling `.pause_from_sync()`
from the root actor when called from a `trio.to_thread.run_sync()`
bg thread:
- root-actor bg threads which can't `Lock._debug_lock.acquire()` since
  they aren't in `trio.Task`s.
- even if scheduled via `.to_thread.run_sync(_debug._pause)` the
  acquirer won't be the task/thread which calls `Lock.release()` from
  `PdbREPL` hooks; this results in a RTE raised by `trio`..
- multiple threads will step on each other's stdio since cpython's GIL
  seems to ctx switch threads on every input from the user to the REPL
  loop..

Reproduce via reworking our example and test so that they catch and fail
for all edge cases:
- rework the `/examples/debugging/sync_bp.py` example to demonstrate the
  above issues, namely the stdio clobbering in the REPL when multiple
  threads and/or a subactor try to debug simultaneously.
  |_ run one thread using a task nursery to ensure it runs conc with the
     nursery's parent task.
  |_ ensure the bg threads run conc a subactor usage of
     `.pause_from_sync()`.
  |_ gravely detail all the special cases inside a TODO comment.
  |_ add some control flags to `sync_pause()` helper and don't use
     `breakpoint()` by default.
- extend and adjust `test_debugger.test_pause_from_sync` to match (and
  thus currently fail) by ensuring exclusive `PdbREPL` attachment when
  the 2 bg root-actor threads are concurrently interacting alongside the
  subactor:
  |_ should only see one of the `_pause_msg` logs at a time for either
     one of the threads or the subactor.
  |_ ensure each attaches (in no particular order) before expecting the
     script to exit.

Impl adjustments to `.devx._debug`:
- drop `Lock.repl`, no longer used.
- add `Lock._owned_by_root: bool` for the `.ctx_in_debug == None`
  root-actor-task active case.
- always `log.exception()` for any `._debug_lock.release()` ownership
  RTE emitted by `trio`, like we used to..
- add special `Lock.release()` log message for the stale lock but
  `._owned_by_root == True` case; oh yeah and actually
  `log.devx(message)`..
- rename `Lock.acquire()` -> `.acquire_for_ctx()` since it's only ever
  used from subactor IPC usage; well that and for local root-task
  usage we should prolly add a `.acquire_from_root_task()`?
- buncha `._pause()` impl improvements:
 |_ type `._pause()`'s `debug_func` as a `partial` as well.
 |_ offer `called_from_sync: bool` and `called_from_bg_thread: bool`
    for the special case handling when called from `.pause_from_sync()`
 |_ only set `DebugStatus.repl/repl_task` when `debug_func != None`
   (OW ensure the `.repl_task` is not the current one).
 |_ handle error logging even when `debug_func is None`..
 |_ lotsa detailed commentary around root-actor-bg-thread special cases.
- when `._set_trace(hide_tb=False)` do `pdbp.set_trace(frame=currentframe())`
  so the `._debug` internal frames are always included.
- by default always hide tracebacks for `.pause[_from_sync]()` internals.
- improve `.pause_from_sync()` to avoid root-bg-thread crashes:
 |_ pass new `called_from_xxx_` flags and ensure `DebugStatus.repl_task`
    is actually set to the `threading.current_thread()` when needed.
 |_ manually call `Lock._debug_lock.acquire_nowait()` for the non-bg
    thread case.
 |_ TODO: still need to implement the bg-thread case using a bg
    `trio.Task`-in-thread with an `trio.Event` set by thread REPL exit.
runtime_to_msgspec
Tyler Goodlet 2024-06-06 16:14:58 -04:00
parent f0342d6ae3
commit 408a74784e
3 changed files with 323 additions and 114 deletions

View File

@ -1,15 +1,32 @@
from functools import partial
import time
from threading import current_thread
import trio
import tractor
def sync_pause(
use_builtin: bool = True,
use_builtin: bool = False,
error: bool = False,
hide_tb: bool = True,
pre_sleep: float|None = None,
):
if pre_sleep:
time.sleep(pre_sleep)
if use_builtin:
breakpoint(hide_tb=False)
print(
f'Entering `breakpoint()` from\n'
f'{current_thread()}\n'
)
breakpoint(hide_tb=hide_tb)
else:
print(
f'Entering `tractor.pause_from_sync()` from\n'
f'{current_thread()}@{tractor.current_actor().uid}\n'
)
tractor.pause_from_sync()
if error:
@ -25,44 +42,114 @@ async def start_n_sync_pause(
# sync to parent-side task
await ctx.started()
print(f'entering SYNC PAUSE in {actor.uid}')
print(f'Entering `sync_pause()` in subactor: {actor.uid}\n')
sync_pause()
print(f'back from SYNC PAUSE in {actor.uid}')
print(f'Exited `sync_pause()` in subactor: {actor.uid}\n')
async def main() -> None:
async with tractor.open_nursery(
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
debug_mode=True,
) as an:
async with (
tractor.open_nursery(
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
debug_mode=True,
# loglevel='cancel',
) as an,
trio.open_nursery() as tn,
):
# just from root task
sync_pause()
p: tractor.Portal = await an.start_actor(
'subactor',
enable_modules=[__name__],
# infect_asyncio=True,
debug_mode=True,
loglevel='cancel',
)
# TODO: 3 sub-actor usage cases:
# -[x] via a `.open_context()`
# -[ ] via a `.run_in_actor()` call
# -[ ] via a `.run()`
# -[ ] via a `.open_context()`
#
# -[ ] via a `.to_thread.run_sync()` in subactor
async with p.open_context(
start_n_sync_pause,
) as (ctx, first):
assert first is None
await tractor.pause()
sync_pause()
# TODO: handle bg-thread-in-root-actor special cases!
#
# there are a couple very subtle situations possible here
# and they are likely to become more important as cpython
# moves to support no-GIL.
#
# Cases:
# 1. root-actor bg-threads that call `.pause_from_sync()`
# whilst an in-tree subactor also is using ` .pause()`.
# |_ since the root-actor bg thread can not
# `Lock._debug_lock.acquire_nowait()` without running
# a `trio.Task`, AND because the
# `PdbREPL.set_continue()` is called from that
# bg-thread, we can not `._debug_lock.release()`
# either!
# |_ this results in no actor-tree `Lock` being used
# on behalf of the bg-thread and thus the subactor's
# task and the thread trying to to use stdio
# simultaneously which results in the classic TTY
# clobbering!
#
# 2. mutiple sync-bg-threads that call
# `.pause_from_sync()` where one is scheduled via
# `Nursery.start_soon(to_thread.run_sync)` in a bg
# task.
#
# Due to the GIL, the threads never truly try to step
# through the REPL simultaneously, BUT their `logging`
# and traceback outputs are interleaved since the GIL
# (seemingly) on every REPL-input from the user
# switches threads..
#
# Soo, the context switching semantics of the GIL
# result in a very confusing and messy interaction UX
# since eval and (tb) print output is NOT synced to
# each REPL-cycle (like we normally make it via
# a `.set_continue()` callback triggering the
# `Lock.release()`). Ideally we can solve this
# usability issue NOW because this will of course be
# that much more important when eventually there is no
# GIL!
# TODO: make this work!!
await trio.to_thread.run_sync(
sync_pause,
abandon_on_cancel=False,
)
# XXX should cause double REPL entry and thus TTY
# clobbering due to case 1. above!
tn.start_soon(
partial(
trio.to_thread.run_sync,
partial(
sync_pause,
use_builtin=False,
# pre_sleep=0.5,
),
abandon_on_cancel=True,
thread_name='start_soon_root_bg_thread',
)
)
await tractor.pause()
# XXX should cause double REPL entry and thus TTY
# clobbering due to case 2. above!
await trio.to_thread.run_sync(
partial(
sync_pause,
# NOTE this already works fine since in the new
# thread the `breakpoint()` built-in is never
# overloaded, thus NO locking is used, HOWEVER
# the case 2. from above still exists!
use_builtin=True,
),
abandon_on_cancel=False,
thread_name='inline_root_bg_thread',
)
await ctx.cancel()

View File

@ -1073,6 +1073,8 @@ def test_pause_from_sync(
'''
child = spawn('sync_bp')
# first `sync_pause()` after nurseries open
child.expect(PROMPT)
assert_before(
child,
@ -1087,43 +1089,70 @@ def test_pause_from_sync(
do_ctlc(child)
child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT)
# XXX shouldn't see gb loaded again
# XXX shouldn't see gb loaded message with PDB loglevel!
before = str(child.before.decode())
assert not in_prompt_msg(
before,
['`greenback` portal opened!'],
)
# should be same root task
assert_before(
child,
[_pause_msg, "('root'",],
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(child)
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[_pause_msg, "('subactor'",],
)
# one of the bg thread or subactor should have
# `Lock.acquire()`-ed
# (NOT both, which will result in REPL clobbering!)
attach_patts: dict[str, list[str]] = {
'subactor': [
"'start_n_sync_pause'",
"('subactor'",
],
'inline_root_bg_thread': [
"<Thread(inline_root_bg_thread",
"('root'",
],
'start_soon_root_bg_thread': [
"<Thread(start_soon_root_bg_thread",
"('root'",
],
}
while attach_patts:
child.sendline('c')
child.expect(PROMPT)
before = str(child.before.decode())
for key in attach_patts.copy():
if key in before:
expected_patts: str = attach_patts.pop(key)
assert_before(
child,
[_pause_msg] + expected_patts
)
break
if ctlc:
do_ctlc(child)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.items():
assert not in_prompt_msg(
before,
other_patts,
)
child.sendline('c')
child.expect(PROMPT)
# non-main thread case
# TODO: should we agument the pre-prompt msg in this case?
assert_before(
child,
[_pause_msg, "('root'",],
)
if ctlc:
do_ctlc(child)
if ctlc:
do_ctlc(child)
child.sendline('c')
child.expect(pexpect.EOF)

View File

@ -182,6 +182,8 @@ class LockRelease(
__pld_spec__: TypeAlias = LockStatus|LockRelease
# TODO: instantiate this only in root from factory
# so as to allow runtime errors from subactors.
class Lock:
'''
Actor-tree-global debug lock state, exists only in a root process.
@ -189,12 +191,6 @@ class Lock:
Mostly to avoid a lot of global declarations for now XD.
'''
# XXX local ref to the `Pbp` instance, ONLY set in the
# actor-process that currently has activated a REPL
# i.e. it will be `None` (unset) in any other actor-process
# that does not have this lock acquired in the root proc.
repl: PdbREPL|None = None
@staticmethod
def get_locking_task_cs() -> CancelScope|None:
if not is_root_process():
@ -223,6 +219,7 @@ class Lock:
ctx_in_debug: Context|None = None
req_handler_finished: trio.Event|None = None
_owned_by_root: bool = False
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
_blocked: set[
tuple[str, str] # `Actor.uid` for per actor
@ -231,23 +228,16 @@ class Lock:
@classmethod
def repr(cls) -> str:
# both root and subs
lock_stats: trio.LockStatistics = cls._debug_lock.statistics()
fields: str = (
f'repl: {cls.repl}\n'
f'req_handler_finished: {cls.req_handler_finished}\n'
f'_blocked: {cls._blocked}\n\n'
f'_debug_lock: {cls._debug_lock}\n'
f'lock_stats: {lock_stats}\n'
f'ctx_in_debug: {cls.ctx_in_debug}\n'
)
if is_root_process():
lock_stats: trio.LockStatistics = cls._debug_lock.statistics()
fields += (
f'req_handler_finished: {cls.req_handler_finished}\n'
f'_blocked: {cls._blocked}\n\n'
f'_debug_lock: {cls._debug_lock}\n'
f'lock_stats: {lock_stats}\n'
)
body: str = textwrap.indent(
fields,
prefix=' |_',
@ -256,8 +246,6 @@ class Lock:
f'<{cls.__name__}(\n'
f'{body}'
')>\n\n'
f'{cls.ctx_in_debug}\n'
)
@classmethod
@ -266,7 +254,10 @@ class Lock:
cls,
force: bool = False,
):
message: str = 'TTY lock not held by any child\n'
if not cls._owned_by_root:
message: str = 'TTY lock not held by any child\n'
else:
message: str = 'TTY lock held in root-actor task\n'
if not (is_trio_main := DebugStatus.is_main_trio_thread()):
task: threading.Thread = threading.current_thread()
@ -279,8 +270,20 @@ class Lock:
if (
lock.locked()
and
owner is task
# ^-NOTE-^ if not will raise a RTE..
(
owner is task
# or
# cls._owned_by_root
)
# ^-NOTE-^ if we do NOT ensure this, `trio` will
# raise a RTE when a non-owner tries to releasee the
# lock.
#
# Further we need to be extra pedantic about the
# correct task, greenback-spawned-task and/or thread
# being set to the `.repl_task` such that the above
# condition matches and we actually release the lock.
# This is particular of note from `.pause_from_sync()`!
):
if not is_trio_main:
trio.from_thread.run_sync(
@ -290,6 +293,10 @@ class Lock:
cls._debug_lock.release()
message: str = 'TTY lock released for child\n'
except RuntimeError as rte:
log.exception('Failed to release `Lock`?')
raise rte
finally:
# IFF there are no more requesting tasks queued up fire, the
# "tty-unlocked" event thereby alerting any monitors of the lock that
@ -305,7 +312,11 @@ class Lock:
):
message += '-> No more child ctx tasks hold the TTY lock!\n'
elif req_handler_finished:
elif (
req_handler_finished
and
lock.locked()
):
req_stats = req_handler_finished.statistics()
message += (
f'-> A child ctx task still owns the `Lock` ??\n'
@ -315,9 +326,20 @@ class Lock:
cls.ctx_in_debug = None
if (
cls._owned_by_root
):
if not lock.locked():
cls._owned_by_root = False
else:
message += 'Lock still held by root actor task?!?\n'
lock.release()
log.devx(message)
@classmethod
@acm
async def acquire(
async def acquire_for_ctx(
cls,
ctx: Context,
@ -372,7 +394,7 @@ class Lock:
)
# NOTE: critical section: this yield is unshielded!
#
# IF we received a cancel during the shielded lock entry of some
# next-in-queue requesting task, then the resumption here will
# result in that ``trio.Cancelled`` being raised to our caller
@ -384,7 +406,7 @@ class Lock:
yield cls._debug_lock
finally:
message :str = 'Exiting `Lock.acquire()` on behalf of sub-actor\n'
message :str = 'Exiting `Lock.acquire_for_ctx()` on behalf of sub-actor\n'
if we_acquired:
message += '-> TTY lock released by child\n'
cls.release()
@ -468,11 +490,11 @@ async def lock_tty_for_child(
# TODO: use `.msg._ops.maybe_limit_plds()` here instead so we
# can merge into a single async with, with the
# `Lock.acquire()` enter below?
# `Lock.acquire_for_ctx()` enter below?
#
# enable the locking msgspec
with apply_debug_pldec():
async with Lock.acquire(ctx=ctx):
async with Lock.acquire_for_ctx(ctx=ctx):
debug_lock_cs.shield = True
log.devx(
@ -567,6 +589,11 @@ class DebugStatus:
whenever a local task is an active REPL.
'''
# XXX local ref to the `pdbp.Pbp` instance, ONLY set in the
# actor-process that currently has activated a REPL i.e. it
# should be `None` (unset) in any other actor-process that does
# not yet have the `Lock` acquired via a root-actor debugger
# request.
repl: PdbREPL|None = None
# TODO: yet again this looks like a task outcome where we need
@ -1443,7 +1470,7 @@ class DebugRequestError(RuntimeError):
async def _pause(
debug_func: Callable|None,
debug_func: Callable|partial|None,
# NOTE: must be passed in the `.pause_from_sync()` case!
repl: PdbREPL|None = None,
@ -1457,7 +1484,9 @@ async def _pause(
# be no way to override it?..
#
shield: bool = False,
hide_tb: bool = False,
hide_tb: bool = True,
called_from_sync: bool = False,
called_from_bg_thread: bool = False,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
**debug_func_kwargs,
@ -1502,27 +1531,15 @@ async def _pause(
# -[ ] factor out better, main reason for it is common logic for
# both root and sub repl entry
def _enter_repl_sync(
debug_func: Callable,
debug_func: partial[None],
) -> None:
__tracebackhide__: bool = hide_tb
debug_func_name: str = (
debug_func.func.__name__ if debug_func else 'None'
)
try:
# set local actor task to avoid recurrent
# entries/requests from the same local task (to the root
# process).
DebugStatus.repl_task = task
DebugStatus.repl = repl
# TODO: do we want to support using this **just** for the
# locking / common code (prolly to help address #320)?
if debug_func is None:
task_status.started(DebugStatus)
else:
log.warning(
'Entering REPL for task fuck you!\n'
f'{task}\n'
)
if debug_func:
# block here one (at the appropriate frame *up*) where
# ``breakpoint()`` was awaited and begin handling stdio.
log.devx(
@ -1531,6 +1548,12 @@ async def _pause(
f' |_{task}\n'
)
# set local actor task to avoid recurrent
# entries/requests from the same local task (to the root
# process).
DebugStatus.repl = repl
DebugStatus.repl_task = task
# invoke the low-level REPL activation routine which itself
# should call into a `Pdb.set_trace()` of some sort.
debug_func(
@ -1539,10 +1562,27 @@ async def _pause(
**debug_func_kwargs,
)
# TODO: maybe invert this logic and instead
# do `assert debug_func is None` when
# `called_from_sync`?
else:
if (
called_from_sync
# and
# is_root_process()
and
not DebugStatus.is_main_trio_thread()
):
assert DebugStatus.repl_task is not task
# TODO: do we want to support using this **just** for the
# locking / common code (prolly to help address #320)?
task_status.started(DebugStatus)
except trio.Cancelled:
log.exception(
'Cancelled during invoke of internal `debug_func = '
f'{debug_func.func.__name__}`\n'
'Cancelled during invoke of internal\n\n'
f'`debug_func = {debug_func_name}`\n'
)
# XXX NOTE: DON'T release lock yet
raise
@ -1550,8 +1590,8 @@ async def _pause(
except BaseException:
__tracebackhide__: bool = False
log.exception(
'Failed to invoke internal `debug_func = '
f'{debug_func.func.__name__}`\n'
'Failed to invoke internal\n\n'
f'`debug_func = {debug_func_name}`\n'
)
# NOTE: OW this is ONLY called from the
# `.set_continue/next` hooks!
@ -1597,34 +1637,56 @@ async def _pause(
f'This root actor task is already within an active REPL session\n'
f'Ignoring this re-entered `tractor.pause()`\n'
f'task: {task.name}\n'
f'REPL: {Lock.repl}\n'
# TODO: use `._frame_stack` scanner to find the @api_frame
)
with trio.CancelScope(shield=shield):
await trio.lowlevel.checkpoint()
return
# XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm.
# must shield here to avoid hitting a `Cancelled` and
# a child getting stuck bc we clobbered the tty
with trio.CancelScope(shield=shield):
if Lock._debug_lock.locked():
log.warning(
'attempting to shield-acquire active TTY lock owned by\n'
acq_prefix: str = 'shield-' if shield else ''
ctx_line: str = (
'lock owned by ctx\n\n'
f'{ctx}'
) if ctx else 'stale lock with no request ctx!?'
log.devx(
f'attempting to {acq_prefix}acquire active TTY '
f'{ctx_line}'
)
# must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty
# with trio.CancelScope(shield=True):
await Lock._debug_lock.acquire()
else:
# may be cancelled
# XXX: since we need to enter pdb synchronously below,
# and we don't want to block the thread that starts
# stepping through the application thread, we later
# must `Lock._debug_lock.release()` manually from
# some `PdbREPL` completion callback(`.set_[continue/exit]()`).
#
# So, when `._pause()` is called from a (bg/non-trio)
# thread, special provisions are needed and we need
# to do the `.acquire()`/`.release()` calls from
# a common `trio.task` (due to internal impl of
# `FIFOLock`). Thus we do not acquire here and
# instead expect `.pause_from_sync()` to take care of
# this detail depending on the caller's (threading)
# usage.
#
# NOTE that this special case is ONLY required when
# using `.pause_from_sync()` from the root actor
# since OW a subactor will instead make an IPC
# request (in the branch below) to acquire the
# `Lock`-mutex and a common root-actor RPC task will
# take care of `._debug_lock` mgmt!
if not called_from_sync:
await Lock._debug_lock.acquire()
Lock._owned_by_root = True
# enter REPL from root, no TTY locking IPC ctx necessary
# since we can acquire the `Lock._debug_lock` directly in
# thread.
_enter_repl_sync(debug_func)
return # next branch is mutex and for subactors
# TODO: need a more robust check for the "root" actor
elif (
@ -1843,6 +1905,11 @@ def _set_trace(
# called our API.
caller_frame: FrameType = api_frame.f_back # type: ignore
# pretend this frame is the caller frame to show
# the entire call-stack all the way down to here.
if not hide_tb:
caller_frame: FrameType = inspect.currentframe()
# engage ze REPL
# B~()
repl.set_trace(frame=caller_frame)
@ -1850,7 +1917,7 @@ def _set_trace(
async def pause(
*,
hide_tb: bool = False,
hide_tb: bool = True,
api_frame: FrameType|None = None,
# TODO: figure out how to still make this work:
@ -1970,13 +2037,12 @@ async def maybe_init_greenback(
# runtime aware version which takes care of all .
def pause_from_sync(
hide_tb: bool = False,
# proxied to `_pause()`
hide_tb: bool = True,
**_pause_kwargs,
# for eg.
# proxy to `._pause()`, for ex:
# shield: bool = False,
# api_frame: FrameType|None = None,
**_pause_kwargs,
) -> None:
@ -2020,26 +2086,53 @@ def pause_from_sync(
# noop: non-cancelled `.to_thread`
# `trio.Cancelled`: cancelled `.to_thread`
#
log.warning(
'Engaging `.pause_from_sync()` from ANOTHER THREAD!'
)
task: threading.Thread = threading.current_thread()
DebugStatus.repl_task: str = task
# TODO: make root-actor bg thread usage work!
# if is_root_process():
# async def _pause_from_sync_thread():
# ...
# else:
# .. the below ..
trio.from_thread.run(
partial(
_pause,
debug_func=None,
repl=mdb,
hide_tb=hide_tb,
# XXX to prevent `._pause()` for setting
# `DebugStatus.repl_task` to the gb task!
called_from_sync=True,
called_from_bg_thread=True,
**_pause_kwargs
),
)
task: threading.Thread = threading.current_thread()
else: # we are presumably the `trio.run()` + main thread
task: trio.Task = current_task()
DebugStatus.repl_task: str = task
greenback.await_(
_pause(
debug_func=None,
repl=mdb,
hide_tb=hide_tb,
called_from_sync=True,
**_pause_kwargs,
)
)
DebugStatus.repl_task: str = current_task()
if is_root_process():
# Manually acquire since otherwise on release we'll
# get a RTE raised by `trio` due to ownership..
Lock._debug_lock.acquire_nowait()
Lock._owned_by_root = True
# TODO: ensure we aggressively make the user aware about
# entering the global ``breakpoint()`` built-in from sync