forked from goodboy/tractor
				
			Finally, officially support shielded REPL-ing!
It's been a long time prepped and now finally implemented! Offer a `shield: bool` argument from our async `._debug` APIs: - `await tractor.pause(shield=True)`, - `await tractor.post_mortem(shield=True)` ^-These-^ can now be used inside cancelled `trio.CancelScope`s, something very handy when introspecting complex (distributed) system tear/shut-downs particularly under remote error or (inter-peer) cancellation conditions B) Thanks to previous prepping in a prior attempt and various patches from the rigorous rework of `.devx._debug` internals around typed msg specs, there ain't much that was needed! Impl deats - obvi passthrough `shield` from the public API endpoints (was already done from a prior attempt). - put ad-hoc internal `with trio.CancelScope(shield=shield):` around all checkpoints inside `._pause()` for both the root-process and subactor case branches. Add a fairly rigorous example, `examples/debugging/shielded_pause.py` with a wrapping `pexpect` test, `test_debugger.test_shield_pause()` and ensure it covers as many cases as i can think of offhand: - multiple `.pause()` entries in a loop despite parent scope cancellation in a subactor RPC task which itself spawns a sub-task. - a `trio.Nursery.parent_task` which raises, is handled and tries to enter and unshielded `.post_mortem()`, which of course internally raises `Cancelled` in a `._pause()` checkpoint, so we catch the `Cancelled` again and then debug the debugger's internal cancellation with specific checks for the particular raising checkpoint-LOC. - do ^- the latter -^ for both subactor and root cases to ensure we can debug `._pause()` itself when it tries to REPL engage from a cancelled task scope Boremotes/1757153874605917753/main
							parent
							
								
									d98f06314d
								
							
						
					
					
						commit
						15a47dc4f7
					
				|  | @ -0,0 +1,88 @@ | |||
| import trio | ||||
| import tractor | ||||
| 
 | ||||
| 
 | ||||
| async def cancellable_pause_loop( | ||||
|     task_status: trio.TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED | ||||
| ): | ||||
|     with trio.CancelScope() as cs: | ||||
|         task_status.started(cs) | ||||
|         for _ in range(3): | ||||
|             try: | ||||
|                 # ON first entry, there is no level triggered | ||||
|                 # cancellation yet, so this cp does a parent task | ||||
|                 # ctx-switch so that this scope raises for the NEXT | ||||
|                 # checkpoint we hit. | ||||
|                 await trio.lowlevel.checkpoint() | ||||
|                 await tractor.pause() | ||||
| 
 | ||||
|                 cs.cancel() | ||||
| 
 | ||||
|                 # parent should have called `cs.cancel()` by now | ||||
|                 await trio.lowlevel.checkpoint() | ||||
| 
 | ||||
|             except trio.Cancelled: | ||||
|                 print('INSIDE SHIELDED PAUSE') | ||||
|                 await tractor.pause(shield=True) | ||||
|         else: | ||||
|             # should raise it again, bubbling up to parent | ||||
|             print('BUBBLING trio.Cancelled to parent task-nursery') | ||||
|             await trio.lowlevel.checkpoint() | ||||
| 
 | ||||
| 
 | ||||
| async def pm_on_cancelled(): | ||||
|     async with trio.open_nursery() as tn: | ||||
|         tn.cancel_scope.cancel() | ||||
|         try: | ||||
|             await trio.sleep_forever() | ||||
|         except trio.Cancelled: | ||||
|             # should also raise `Cancelled` since | ||||
|             # we didn't pass `shield=True`. | ||||
|             try: | ||||
|                 await tractor.post_mortem(hide_tb=False) | ||||
|             except trio.Cancelled as taskc: | ||||
| 
 | ||||
|                 # should enter just fine, in fact it should | ||||
|                 # be debugging the internals of the previous | ||||
|                 # sin-shield call above Bo | ||||
|                 await tractor.post_mortem( | ||||
|                     hide_tb=False, | ||||
|                     shield=True, | ||||
|                 ) | ||||
|                 raise taskc | ||||
| 
 | ||||
|         else: | ||||
|             raise RuntimeError('Dint cancel as expected!?') | ||||
| 
 | ||||
| 
 | ||||
| async def cancelled_before_pause( | ||||
| ): | ||||
|     ''' | ||||
|     Verify that using a shielded pause works despite surrounding | ||||
|     cancellation called state in the calling task. | ||||
| 
 | ||||
|     ''' | ||||
|     async with trio.open_nursery() as tn: | ||||
|         cs: trio.CancelScope = await tn.start(cancellable_pause_loop) | ||||
|         await trio.sleep(0.1) | ||||
| 
 | ||||
|     assert cs.cancelled_caught | ||||
| 
 | ||||
|     await pm_on_cancelled() | ||||
| 
 | ||||
| 
 | ||||
| async def main(): | ||||
|     async with tractor.open_nursery( | ||||
|         debug_mode=True, | ||||
|     ) as n: | ||||
|         portal: tractor.Portal = await n.run_in_actor( | ||||
|             cancelled_before_pause, | ||||
|         ) | ||||
|         await portal.result() | ||||
| 
 | ||||
|         # ensure the same works in the root actor! | ||||
|         await pm_on_cancelled() | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     trio.run(main) | ||||
|  | @ -1232,6 +1232,81 @@ def test_post_mortem_api( | |||
|     child.expect(pexpect.EOF) | ||||
| 
 | ||||
| 
 | ||||
| def test_shield_pause( | ||||
|     spawn, | ||||
| ): | ||||
|     ''' | ||||
|     Verify the `tractor.pause()/.post_mortem()` API works inside an | ||||
|     already cancelled `trio.CancelScope` and that you can step to the | ||||
|     next checkpoint wherein the cancelled will get raised. | ||||
| 
 | ||||
|     ''' | ||||
|     child = spawn('shielded_pause') | ||||
| 
 | ||||
|     # First entry is via manual `.post_mortem()` | ||||
|     child.expect(PROMPT) | ||||
|     assert_before( | ||||
|         child, | ||||
|         [ | ||||
|             _pause_msg, | ||||
|             "cancellable_pause_loop'", | ||||
|             "('cancelled_before_pause'",  # actor name | ||||
|         ] | ||||
|     ) | ||||
| 
 | ||||
|     # since 3 tries in ex. shield pause loop | ||||
|     for i in range(3): | ||||
|         child.sendline('c') | ||||
|         child.expect(PROMPT) | ||||
|         assert_before( | ||||
|             child, | ||||
|             [ | ||||
|                 _pause_msg, | ||||
|                 "INSIDE SHIELDED PAUSE", | ||||
|                 "('cancelled_before_pause'",  # actor name | ||||
|             ] | ||||
|         ) | ||||
| 
 | ||||
|     # back inside parent task that opened nursery | ||||
|     child.sendline('c') | ||||
|     child.expect(PROMPT) | ||||
|     assert_before( | ||||
|         child, | ||||
|         [ | ||||
|             _crash_msg, | ||||
|             "('cancelled_before_pause'",  # actor name | ||||
|             "Failed to engage debugger via `_pause()`", | ||||
|             "trio.Cancelled", | ||||
|             "raise Cancelled._create()", | ||||
| 
 | ||||
|             # we should be handling a taskc inside | ||||
|             # the first `.port_mortem()` sin-shield! | ||||
|             'await DebugStatus.req_finished.wait()', | ||||
|         ] | ||||
|     ) | ||||
| 
 | ||||
|     # same as above but in the root actor's task | ||||
|     child.sendline('c') | ||||
|     child.expect(PROMPT) | ||||
|     assert_before( | ||||
|         child, | ||||
|         [ | ||||
|             _crash_msg, | ||||
|             "('root'",  # actor name | ||||
|             "Failed to engage debugger via `_pause()`", | ||||
|             "trio.Cancelled", | ||||
|             "raise Cancelled._create()", | ||||
| 
 | ||||
|             # handling a taskc inside the first unshielded | ||||
|             # `.port_mortem()`. | ||||
|             # BUT in this case in the root-proc path ;) | ||||
|             'wait Lock._debug_lock.acquire()', | ||||
|         ] | ||||
|     ) | ||||
|     child.sendline('c') | ||||
|     child.expect(pexpect.EOF) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: needs ANSI code stripping tho, see `assert_before()` # above! | ||||
| def test_correct_frames_below_hidden(): | ||||
|     ''' | ||||
|  | @ -1253,7 +1328,3 @@ def test_cant_pause_from_paused_task(): | |||
| 
 | ||||
|     ''' | ||||
|     ... | ||||
| 
 | ||||
| 
 | ||||
| def test_shield_pause(): | ||||
|     ... | ||||
|  |  | |||
|  | @ -1600,25 +1600,27 @@ async def _pause( | |||
|                     f'REPL: {Lock.repl}\n' | ||||
|                     # TODO: use `._frame_stack` scanner to find the @api_frame | ||||
|                 ) | ||||
|                 await trio.lowlevel.checkpoint() | ||||
|                 with trio.CancelScope(shield=shield): | ||||
|                     await trio.lowlevel.checkpoint() | ||||
|                 return | ||||
| 
 | ||||
|             # XXX: since we need to enter pdb synchronously below, | ||||
|             # we have to release the lock manually from pdb completion | ||||
|             # callbacks. Can't think of a nicer way then this atm. | ||||
|             if Lock._debug_lock.locked(): | ||||
|                 log.warning( | ||||
|                     'attempting to shield-acquire active TTY lock owned by\n' | ||||
|                     f'{ctx}' | ||||
|                 ) | ||||
|             with trio.CancelScope(shield=shield): | ||||
|                 if Lock._debug_lock.locked(): | ||||
|                     log.warning( | ||||
|                         'attempting to shield-acquire active TTY lock owned by\n' | ||||
|                         f'{ctx}' | ||||
|                     ) | ||||
| 
 | ||||
|                 # must shield here to avoid hitting a ``Cancelled`` and | ||||
|                 # a child getting stuck bc we clobbered the tty | ||||
|                 with trio.CancelScope(shield=True): | ||||
|                     # must shield here to avoid hitting a ``Cancelled`` and | ||||
|                     # a child getting stuck bc we clobbered the tty | ||||
|                     # with trio.CancelScope(shield=True): | ||||
|                     await Lock._debug_lock.acquire() | ||||
|                 else: | ||||
|                     # may be cancelled | ||||
|                     await Lock._debug_lock.acquire() | ||||
|             else: | ||||
|                 # may be cancelled | ||||
|                 await Lock._debug_lock.acquire() | ||||
| 
 | ||||
|             # enter REPL from root, no TTY locking IPC ctx necessary | ||||
|             _enter_repl_sync(debug_func) | ||||
|  | @ -1659,7 +1661,8 @@ async def _pause( | |||
|                             f'{task.name}@{actor.uid} already has TTY lock\n' | ||||
|                             f'ignoring..' | ||||
|                         ) | ||||
|                         await trio.lowlevel.checkpoint() | ||||
|                         with trio.CancelScope(shield=shield): | ||||
|                             await trio.lowlevel.checkpoint() | ||||
|                         return | ||||
| 
 | ||||
|                     else: | ||||
|  | @ -1671,8 +1674,9 @@ async def _pause( | |||
|                             f'{task}@{actor.uid} already has TTY lock\n' | ||||
|                             f'waiting for release..' | ||||
|                         ) | ||||
|                         await DebugStatus.repl_release.wait() | ||||
|                         await trio.sleep(0.1) | ||||
|                         with trio.CancelScope(shield=shield): | ||||
|                             await DebugStatus.repl_release.wait() | ||||
|                             await trio.sleep(0.1) | ||||
| 
 | ||||
|                 elif ( | ||||
|                     req_task | ||||
|  | @ -1683,7 +1687,8 @@ async def _pause( | |||
| 
 | ||||
|                         'Waiting for previous request to complete..\n' | ||||
|                     ) | ||||
|                     await DebugStatus.req_finished.wait() | ||||
|                     with trio.CancelScope(shield=shield): | ||||
|                         await DebugStatus.req_finished.wait() | ||||
| 
 | ||||
|             # this **must** be awaited by the caller and is done using the | ||||
|             # root nursery so that the debugger can continue to run without | ||||
|  | @ -1721,14 +1726,15 @@ async def _pause( | |||
|                 'Starting request task\n' | ||||
|                 f'|_{task}\n' | ||||
|             ) | ||||
|             req_ctx: Context = await actor._service_n.start( | ||||
|                 partial( | ||||
|                     request_root_stdio_lock, | ||||
|                     actor_uid=actor.uid, | ||||
|                     task_uid=(task.name, id(task)),  # task uuid (effectively) | ||||
|                     shield=shield, | ||||
|             with trio.CancelScope(shield=shield): | ||||
|                 req_ctx: Context = await actor._service_n.start( | ||||
|                     partial( | ||||
|                         request_root_stdio_lock, | ||||
|                         actor_uid=actor.uid, | ||||
|                         task_uid=(task.name, id(task)),  # task uuid (effectively) | ||||
|                         shield=shield, | ||||
|                     ) | ||||
|                 ) | ||||
|             ) | ||||
|             # XXX sanity, our locker task should be the one which | ||||
|             # entered a new IPC ctx with the root actor, NOT the one | ||||
|             # that exists around the task calling into `._pause()`. | ||||
|  | @ -2147,6 +2153,13 @@ async def post_mortem( | |||
|     **_pause_kwargs, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     `tractor`'s builtin async equivalient of `pdb.post_mortem()` | ||||
|     which can be used inside exception handlers. | ||||
| 
 | ||||
|     It's also used for the crash handler when `debug_mode == True` ;) | ||||
| 
 | ||||
|     ''' | ||||
|     __tracebackhide__: bool = hide_tb | ||||
| 
 | ||||
|     tb: TracebackType = tb or sys.exc_info()[2] | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue