forked from goodboy/tractor
Finally, officially support shielded REPL-ing!
It's been a long time prepped and now finally implemented! Offer a `shield: bool` argument from our async `._debug` APIs: - `await tractor.pause(shield=True)`, - `await tractor.post_mortem(shield=True)` ^-These-^ can now be used inside cancelled `trio.CancelScope`s, something very handy when introspecting complex (distributed) system tear/shut-downs particularly under remote error or (inter-peer) cancellation conditions B) Thanks to previous prepping in a prior attempt and various patches from the rigorous rework of `.devx._debug` internals around typed msg specs, there ain't much that was needed! Impl deats - obvi passthrough `shield` from the public API endpoints (was already done from a prior attempt). - put ad-hoc internal `with trio.CancelScope(shield=shield):` around all checkpoints inside `._pause()` for both the root-process and subactor case branches. Add a fairly rigorous example, `examples/debugging/shielded_pause.py` with a wrapping `pexpect` test, `test_debugger.test_shield_pause()` and ensure it covers as many cases as i can think of offhand: - multiple `.pause()` entries in a loop despite parent scope cancellation in a subactor RPC task which itself spawns a sub-task. - a `trio.Nursery.parent_task` which raises, is handled and tries to enter and unshielded `.post_mortem()`, which of course internally raises `Cancelled` in a `._pause()` checkpoint, so we catch the `Cancelled` again and then debug the debugger's internal cancellation with specific checks for the particular raising checkpoint-LOC. - do ^- the latter -^ for both subactor and root cases to ensure we can debug `._pause()` itself when it tries to REPL engage from a cancelled task scope Boruntime_to_msgspec
parent
13ea500a44
commit
8ea0f08386
|
@ -0,0 +1,88 @@
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
|
async def cancellable_pause_loop(
|
||||||
|
task_status: trio.TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED
|
||||||
|
):
|
||||||
|
with trio.CancelScope() as cs:
|
||||||
|
task_status.started(cs)
|
||||||
|
for _ in range(3):
|
||||||
|
try:
|
||||||
|
# ON first entry, there is no level triggered
|
||||||
|
# cancellation yet, so this cp does a parent task
|
||||||
|
# ctx-switch so that this scope raises for the NEXT
|
||||||
|
# checkpoint we hit.
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
await tractor.pause()
|
||||||
|
|
||||||
|
cs.cancel()
|
||||||
|
|
||||||
|
# parent should have called `cs.cancel()` by now
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
|
except trio.Cancelled:
|
||||||
|
print('INSIDE SHIELDED PAUSE')
|
||||||
|
await tractor.pause(shield=True)
|
||||||
|
else:
|
||||||
|
# should raise it again, bubbling up to parent
|
||||||
|
print('BUBBLING trio.Cancelled to parent task-nursery')
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
|
|
||||||
|
async def pm_on_cancelled():
|
||||||
|
async with trio.open_nursery() as tn:
|
||||||
|
tn.cancel_scope.cancel()
|
||||||
|
try:
|
||||||
|
await trio.sleep_forever()
|
||||||
|
except trio.Cancelled:
|
||||||
|
# should also raise `Cancelled` since
|
||||||
|
# we didn't pass `shield=True`.
|
||||||
|
try:
|
||||||
|
await tractor.post_mortem(hide_tb=False)
|
||||||
|
except trio.Cancelled as taskc:
|
||||||
|
|
||||||
|
# should enter just fine, in fact it should
|
||||||
|
# be debugging the internals of the previous
|
||||||
|
# sin-shield call above Bo
|
||||||
|
await tractor.post_mortem(
|
||||||
|
hide_tb=False,
|
||||||
|
shield=True,
|
||||||
|
)
|
||||||
|
raise taskc
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise RuntimeError('Dint cancel as expected!?')
|
||||||
|
|
||||||
|
|
||||||
|
async def cancelled_before_pause(
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify that using a shielded pause works despite surrounding
|
||||||
|
cancellation called state in the calling task.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async with trio.open_nursery() as tn:
|
||||||
|
cs: trio.CancelScope = await tn.start(cancellable_pause_loop)
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
assert cs.cancelled_caught
|
||||||
|
|
||||||
|
await pm_on_cancelled()
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=True,
|
||||||
|
) as n:
|
||||||
|
portal: tractor.Portal = await n.run_in_actor(
|
||||||
|
cancelled_before_pause,
|
||||||
|
)
|
||||||
|
await portal.result()
|
||||||
|
|
||||||
|
# ensure the same works in the root actor!
|
||||||
|
await pm_on_cancelled()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
trio.run(main)
|
|
@ -1234,6 +1234,81 @@ def test_post_mortem_api(
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
|
|
||||||
|
def test_shield_pause(
|
||||||
|
spawn,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify the `tractor.pause()/.post_mortem()` API works inside an
|
||||||
|
already cancelled `trio.CancelScope` and that you can step to the
|
||||||
|
next checkpoint wherein the cancelled will get raised.
|
||||||
|
|
||||||
|
'''
|
||||||
|
child = spawn('shielded_pause')
|
||||||
|
|
||||||
|
# First entry is via manual `.post_mortem()`
|
||||||
|
child.expect(PROMPT)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
_pause_msg,
|
||||||
|
"cancellable_pause_loop'",
|
||||||
|
"('cancelled_before_pause'", # actor name
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
# since 3 tries in ex. shield pause loop
|
||||||
|
for i in range(3):
|
||||||
|
child.sendline('c')
|
||||||
|
child.expect(PROMPT)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
_pause_msg,
|
||||||
|
"INSIDE SHIELDED PAUSE",
|
||||||
|
"('cancelled_before_pause'", # actor name
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
# back inside parent task that opened nursery
|
||||||
|
child.sendline('c')
|
||||||
|
child.expect(PROMPT)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
_crash_msg,
|
||||||
|
"('cancelled_before_pause'", # actor name
|
||||||
|
"Failed to engage debugger via `_pause()`",
|
||||||
|
"trio.Cancelled",
|
||||||
|
"raise Cancelled._create()",
|
||||||
|
|
||||||
|
# we should be handling a taskc inside
|
||||||
|
# the first `.port_mortem()` sin-shield!
|
||||||
|
'await DebugStatus.req_finished.wait()',
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
# same as above but in the root actor's task
|
||||||
|
child.sendline('c')
|
||||||
|
child.expect(PROMPT)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
_crash_msg,
|
||||||
|
"('root'", # actor name
|
||||||
|
"Failed to engage debugger via `_pause()`",
|
||||||
|
"trio.Cancelled",
|
||||||
|
"raise Cancelled._create()",
|
||||||
|
|
||||||
|
# handling a taskc inside the first unshielded
|
||||||
|
# `.port_mortem()`.
|
||||||
|
# BUT in this case in the root-proc path ;)
|
||||||
|
'wait Lock._debug_lock.acquire()',
|
||||||
|
]
|
||||||
|
)
|
||||||
|
child.sendline('c')
|
||||||
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
|
|
||||||
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
|
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
|
||||||
def test_correct_frames_below_hidden():
|
def test_correct_frames_below_hidden():
|
||||||
'''
|
'''
|
||||||
|
@ -1255,7 +1330,3 @@ def test_cant_pause_from_paused_task():
|
||||||
|
|
||||||
'''
|
'''
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
def test_shield_pause():
|
|
||||||
...
|
|
||||||
|
|
|
@ -1600,25 +1600,27 @@ async def _pause(
|
||||||
f'REPL: {Lock.repl}\n'
|
f'REPL: {Lock.repl}\n'
|
||||||
# TODO: use `._frame_stack` scanner to find the @api_frame
|
# TODO: use `._frame_stack` scanner to find the @api_frame
|
||||||
)
|
)
|
||||||
await trio.lowlevel.checkpoint()
|
with trio.CancelScope(shield=shield):
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
return
|
return
|
||||||
|
|
||||||
# XXX: since we need to enter pdb synchronously below,
|
# XXX: since we need to enter pdb synchronously below,
|
||||||
# we have to release the lock manually from pdb completion
|
# we have to release the lock manually from pdb completion
|
||||||
# callbacks. Can't think of a nicer way then this atm.
|
# callbacks. Can't think of a nicer way then this atm.
|
||||||
if Lock._debug_lock.locked():
|
with trio.CancelScope(shield=shield):
|
||||||
log.warning(
|
if Lock._debug_lock.locked():
|
||||||
'attempting to shield-acquire active TTY lock owned by\n'
|
log.warning(
|
||||||
f'{ctx}'
|
'attempting to shield-acquire active TTY lock owned by\n'
|
||||||
)
|
f'{ctx}'
|
||||||
|
)
|
||||||
|
|
||||||
# must shield here to avoid hitting a ``Cancelled`` and
|
# must shield here to avoid hitting a ``Cancelled`` and
|
||||||
# a child getting stuck bc we clobbered the tty
|
# a child getting stuck bc we clobbered the tty
|
||||||
with trio.CancelScope(shield=True):
|
# with trio.CancelScope(shield=True):
|
||||||
|
await Lock._debug_lock.acquire()
|
||||||
|
else:
|
||||||
|
# may be cancelled
|
||||||
await Lock._debug_lock.acquire()
|
await Lock._debug_lock.acquire()
|
||||||
else:
|
|
||||||
# may be cancelled
|
|
||||||
await Lock._debug_lock.acquire()
|
|
||||||
|
|
||||||
# enter REPL from root, no TTY locking IPC ctx necessary
|
# enter REPL from root, no TTY locking IPC ctx necessary
|
||||||
_enter_repl_sync(debug_func)
|
_enter_repl_sync(debug_func)
|
||||||
|
@ -1659,7 +1661,8 @@ async def _pause(
|
||||||
f'{task.name}@{actor.uid} already has TTY lock\n'
|
f'{task.name}@{actor.uid} already has TTY lock\n'
|
||||||
f'ignoring..'
|
f'ignoring..'
|
||||||
)
|
)
|
||||||
await trio.lowlevel.checkpoint()
|
with trio.CancelScope(shield=shield):
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
return
|
return
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -1671,8 +1674,9 @@ async def _pause(
|
||||||
f'{task}@{actor.uid} already has TTY lock\n'
|
f'{task}@{actor.uid} already has TTY lock\n'
|
||||||
f'waiting for release..'
|
f'waiting for release..'
|
||||||
)
|
)
|
||||||
await DebugStatus.repl_release.wait()
|
with trio.CancelScope(shield=shield):
|
||||||
await trio.sleep(0.1)
|
await DebugStatus.repl_release.wait()
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
elif (
|
elif (
|
||||||
req_task
|
req_task
|
||||||
|
@ -1683,7 +1687,8 @@ async def _pause(
|
||||||
|
|
||||||
'Waiting for previous request to complete..\n'
|
'Waiting for previous request to complete..\n'
|
||||||
)
|
)
|
||||||
await DebugStatus.req_finished.wait()
|
with trio.CancelScope(shield=shield):
|
||||||
|
await DebugStatus.req_finished.wait()
|
||||||
|
|
||||||
# this **must** be awaited by the caller and is done using the
|
# this **must** be awaited by the caller and is done using the
|
||||||
# root nursery so that the debugger can continue to run without
|
# root nursery so that the debugger can continue to run without
|
||||||
|
@ -1721,14 +1726,15 @@ async def _pause(
|
||||||
'Starting request task\n'
|
'Starting request task\n'
|
||||||
f'|_{task}\n'
|
f'|_{task}\n'
|
||||||
)
|
)
|
||||||
req_ctx: Context = await actor._service_n.start(
|
with trio.CancelScope(shield=shield):
|
||||||
partial(
|
req_ctx: Context = await actor._service_n.start(
|
||||||
request_root_stdio_lock,
|
partial(
|
||||||
actor_uid=actor.uid,
|
request_root_stdio_lock,
|
||||||
task_uid=(task.name, id(task)), # task uuid (effectively)
|
actor_uid=actor.uid,
|
||||||
shield=shield,
|
task_uid=(task.name, id(task)), # task uuid (effectively)
|
||||||
|
shield=shield,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
# XXX sanity, our locker task should be the one which
|
# XXX sanity, our locker task should be the one which
|
||||||
# entered a new IPC ctx with the root actor, NOT the one
|
# entered a new IPC ctx with the root actor, NOT the one
|
||||||
# that exists around the task calling into `._pause()`.
|
# that exists around the task calling into `._pause()`.
|
||||||
|
@ -2147,6 +2153,13 @@ async def post_mortem(
|
||||||
**_pause_kwargs,
|
**_pause_kwargs,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
'''
|
||||||
|
`tractor`'s builtin async equivalient of `pdb.post_mortem()`
|
||||||
|
which can be used inside exception handlers.
|
||||||
|
|
||||||
|
It's also used for the crash handler when `debug_mode == True` ;)
|
||||||
|
|
||||||
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
tb: TracebackType = tb or sys.exc_info()[2]
|
tb: TracebackType = tb or sys.exc_info()[2]
|
||||||
|
|
Loading…
Reference in New Issue