Use context for remote debugger locking
A context is the natural fit (vs. a receive stream) for locking the root proc's tty usage via it's `.started()` sync point. Simplify the `_breakpoin()` routine to be a simple async func instead of all this "returning a coroutine" stuff from before we decided that `tractor.breakpoint()` must be async. Use `runtime` level for locking logging making it easier to trace.transport_hardening
							parent
							
								
									207a88e3a8
								
							
						
					
					
						commit
						6f62277c82
					
				|  | @ -7,7 +7,6 @@ from functools import partial | ||||||
| from contextlib import asynccontextmanager | from contextlib import asynccontextmanager | ||||||
| from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator | from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator | ||||||
| 
 | 
 | ||||||
| from async_generator import aclosing |  | ||||||
| import tractor | import tractor | ||||||
| import trio | import trio | ||||||
| 
 | 
 | ||||||
|  | @ -38,7 +37,9 @@ _pdb_release_hook: Optional[Callable] = None | ||||||
| _in_debug = False | _in_debug = False | ||||||
| 
 | 
 | ||||||
| # lock in root actor preventing multi-access to local tty | # lock in root actor preventing multi-access to local tty | ||||||
| _debug_lock = trio.StrictFIFOLock() | _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() | ||||||
|  | _debug_lock._uid = None | ||||||
|  | _pdb_complete: trio.Event = None | ||||||
| 
 | 
 | ||||||
| # XXX: set by the current task waiting on the root tty lock | # XXX: set by the current task waiting on the root tty lock | ||||||
| # and must be cancelled if this actor is cancelled via message | # and must be cancelled if this actor is cancelled via message | ||||||
|  | @ -119,18 +120,21 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]: | ||||||
|     """Acquire a actor local FIFO lock meant to mutex entry to a local |     """Acquire a actor local FIFO lock meant to mutex entry to a local | ||||||
|     debugger entry point to avoid tty clobbering by multiple processes. |     debugger entry point to avoid tty clobbering by multiple processes. | ||||||
|     """ |     """ | ||||||
|     task_name = trio.lowlevel.current_task().name |     global _debug_lock | ||||||
|     try: |  | ||||||
|         log.debug( |  | ||||||
|             f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}") |  | ||||||
|         await _debug_lock.acquire() |  | ||||||
| 
 | 
 | ||||||
|         log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}") |     task_name = trio.lowlevel.current_task().name | ||||||
|  | 
 | ||||||
|  |     log.runtime( | ||||||
|  |         f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}") | ||||||
|  | 
 | ||||||
|  |     async with _debug_lock: | ||||||
|  | 
 | ||||||
|  |         _debug_lock._uid = uid | ||||||
|  |         log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}") | ||||||
|         yield |         yield | ||||||
| 
 | 
 | ||||||
|     finally: |     _debug_lock._uid = None | ||||||
|         _debug_lock.release() |     log.runtime(f"TTY lock released, remote task: {task_name}:{uid}") | ||||||
|         log.debug(f"TTY lock released, remote task: {task_name}:{uid}") |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # @contextmanager | # @contextmanager | ||||||
|  | @ -144,118 +148,159 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]: | ||||||
| #         signal.signal(signal.SIGINT, prior_handler) | #         signal.signal(signal.SIGINT, prior_handler) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @tractor.context | ||||||
| async def _hijack_stdin_relay_to_child( | async def _hijack_stdin_relay_to_child( | ||||||
|  | 
 | ||||||
|  |     ctx: tractor.context, | ||||||
|     subactor_uid: Tuple[str, str] |     subactor_uid: Tuple[str, str] | ||||||
|  | 
 | ||||||
| ) -> AsyncIterator[str]: | ) -> AsyncIterator[str]: | ||||||
|  | 
 | ||||||
|  |     global _pdb_complete | ||||||
|  | 
 | ||||||
|  |     task_name = trio.lowlevel.current_task().name | ||||||
|  | 
 | ||||||
|     # TODO: when we get to true remote debugging |     # TODO: when we get to true remote debugging | ||||||
|     # this will deliver stdin data |     # this will deliver stdin data? | ||||||
|     log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock") | 
 | ||||||
|  |     log.debug( | ||||||
|  |         "Attempting to acquire TTY lock, " | ||||||
|  |         f"remote task: {task_name}:{subactor_uid}" | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     log.runtime(f"Actor {subactor_uid} is WAITING on stdin hijack lock") | ||||||
|  | 
 | ||||||
|     async with _acquire_debug_lock(subactor_uid): |     async with _acquire_debug_lock(subactor_uid): | ||||||
|         log.warning(f"Actor {subactor_uid} ACQUIRED stdin hijack lock") |  | ||||||
| 
 | 
 | ||||||
|         # with _disable_sigint(): |         with trio.CancelScope(shield=True): | ||||||
| 
 | 
 | ||||||
|         # indicate to child that we've locked stdio |             # indicate to child that we've locked stdio | ||||||
|         yield 'Locked' |             await ctx.started('Locked') | ||||||
|  |             log.runtime(f"Actor {subactor_uid} ACQUIRED stdin hijack lock") | ||||||
| 
 | 
 | ||||||
|         # wait for cancellation of stream by child |         # wait for unlock pdb by child | ||||||
|         # indicating debugger is dis-engaged |         async with ctx.open_stream() as stream: | ||||||
|         await trio.sleep_forever() |             assert await stream.receive() == 'Unlock' | ||||||
|  | 
 | ||||||
|  |     log.runtime( | ||||||
|  |         f"TTY lock released, remote task: {task_name}:{subactor_uid}") | ||||||
| 
 | 
 | ||||||
|     log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock") |     log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # XXX: We only make this sync in case someone wants to | # XXX: We only make this sync in case someone wants to | ||||||
| # overload the ``breakpoint()`` built-in. | # overload the ``breakpoint()`` built-in. | ||||||
| def _breakpoint(debug_func) -> Awaitable[None]: | async def _breakpoint(debug_func) -> Awaitable[None]: | ||||||
|     """``tractor`` breakpoint entry for engaging pdb machinery |     """``tractor`` breakpoint entry for engaging pdb machinery | ||||||
|     in subactors. |     in subactors. | ||||||
|     """ |     """ | ||||||
|     actor = tractor.current_actor() |     actor = tractor.current_actor() | ||||||
|     do_unlock = trio.Event() |     task_name = trio.lowlevel.current_task().name | ||||||
|  | 
 | ||||||
|  |     global _pdb_complete | ||||||
|  |     global _pdb_release_hook | ||||||
|  |     global _in_debug | ||||||
| 
 | 
 | ||||||
|     async def wait_for_parent_stdin_hijack( |     async def wait_for_parent_stdin_hijack( | ||||||
|         task_status=trio.TASK_STATUS_IGNORED |         task_status=trio.TASK_STATUS_IGNORED | ||||||
|     ): |     ): | ||||||
|         global _debugger_request_cs |         global _debugger_request_cs | ||||||
|  | 
 | ||||||
|         with trio.CancelScope() as cs: |         with trio.CancelScope() as cs: | ||||||
|             _debugger_request_cs = cs |             _debugger_request_cs = cs | ||||||
|  | 
 | ||||||
|             try: |             try: | ||||||
|                 async with get_root() as portal: |                 async with get_root() as portal: | ||||||
|                         async with portal.open_stream_from( |  | ||||||
|                             tractor._debug._hijack_stdin_relay_to_child, |  | ||||||
|                             subactor_uid=actor.uid, |  | ||||||
|                         ) as stream: |  | ||||||
| 
 | 
 | ||||||
|                                 # block until first yield above |                     # this syncs to child's ``Context.started()`` call. | ||||||
|                                 async for val in stream: |                     async with portal.open_context( | ||||||
| 
 | 
 | ||||||
|                                     assert val == 'Locked' |                         tractor._debug._hijack_stdin_relay_to_child, | ||||||
|                                     task_status.started() |                         subactor_uid=actor.uid, | ||||||
| 
 | 
 | ||||||
|                                     # with trio.CancelScope(shield=True): |                     ) as (ctx, val): | ||||||
|                                     await do_unlock.wait() | 
 | ||||||
|  |                         assert val == 'Locked' | ||||||
|  | 
 | ||||||
|  |                         async with ctx.open_stream() as stream: | ||||||
|  | 
 | ||||||
|  |                             # unblock local caller | ||||||
|  |                             task_status.started() | ||||||
|  | 
 | ||||||
|  |                             await _pdb_complete.wait() | ||||||
|  |                             await stream.send('Unlock') | ||||||
| 
 | 
 | ||||||
|                                     # trigger cancellation of remote stream |  | ||||||
|                                     break |  | ||||||
|             finally: |             finally: | ||||||
|                 log.debug(f"Exiting debugger for actor {actor}") |                 log.debug(f"Exiting debugger for actor {actor}") | ||||||
|                 global _in_debug |                 global _in_debug | ||||||
|                 _in_debug = False |                 _in_debug = False | ||||||
|                 log.debug(f"Child {actor} released parent stdio lock") |                 log.debug(f"Child {actor} released parent stdio lock") | ||||||
| 
 | 
 | ||||||
|     async def _bp(): |     if not _pdb_complete or _pdb_complete.is_set(): | ||||||
|         """Async breakpoint which schedules a parent stdio lock, and once complete |         _pdb_complete = trio.Event() | ||||||
|         enters the ``pdbpp`` debugging console. |  | ||||||
|         """ |  | ||||||
|         task_name = trio.lowlevel.current_task().name |  | ||||||
| 
 | 
 | ||||||
|         global _in_debug |     # TODO: need a more robust check for the "root" actor | ||||||
| 
 |     if actor._parent_chan and not is_root_process(): | ||||||
|         # TODO: need a more robust check for the "root" actor |         if _in_debug: | ||||||
|         if actor._parent_chan and not is_root_process(): |             if _in_debug == task_name: | ||||||
|             if _in_debug: |                 # this task already has the lock and is | ||||||
|                 if _in_debug == task_name: |                 # likely recurrently entering a breakpoint | ||||||
|                     # this task already has the lock and is |  | ||||||
|                     # likely recurrently entering a breakpoint |  | ||||||
|                     return |  | ||||||
| 
 |  | ||||||
|                 # if **this** actor is already in debug mode block here |  | ||||||
|                 # waiting for the control to be released - this allows |  | ||||||
|                 # support for recursive entries to `tractor.breakpoint()` |  | ||||||
|                 log.warning( |  | ||||||
|                     f"Actor {actor.uid} already has a debug lock, waiting...") |  | ||||||
|                 await do_unlock.wait() |  | ||||||
|                 await trio.sleep(0.1) |  | ||||||
| 
 |  | ||||||
|             # assign unlock callback for debugger teardown hooks |  | ||||||
|             global _pdb_release_hook |  | ||||||
|             _pdb_release_hook = do_unlock.set |  | ||||||
| 
 |  | ||||||
|             # mark local actor as "in debug mode" to avoid recurrent |  | ||||||
|             # entries/requests to the root process |  | ||||||
|             _in_debug = task_name |  | ||||||
| 
 |  | ||||||
|             # this **must** be awaited by the caller and is done using the |  | ||||||
|             # root nursery so that the debugger can continue to run without |  | ||||||
|             # being restricted by the scope of a new task nursery. |  | ||||||
|             await actor._service_n.start(wait_for_parent_stdin_hijack) |  | ||||||
| 
 |  | ||||||
|         elif is_root_process(): |  | ||||||
|             # we also wait in the root-parent for any child that |  | ||||||
|             # may have the tty locked prior |  | ||||||
|             if _debug_lock.locked():  # root process already has it; ignore |  | ||||||
|                 return |                 return | ||||||
|             await _debug_lock.acquire() |  | ||||||
|             _pdb_release_hook = _debug_lock.release |  | ||||||
| 
 | 
 | ||||||
|         # block here one (at the appropriate frame *up* where |             # if **this** actor is already in debug mode block here | ||||||
|         # ``breakpoint()`` was awaited and begin handling stdio |             # waiting for the control to be released - this allows | ||||||
|         log.debug("Entering the synchronous world of pdb") |             # support for recursive entries to `tractor.breakpoint()` | ||||||
|         debug_func(actor) |             log.warning( | ||||||
|  |                 f"Actor {actor.uid} already has a debug lock, waiting...") | ||||||
|  |             await _pdb_complete.wait() | ||||||
|  |             await trio.sleep(0.1) | ||||||
| 
 | 
 | ||||||
|     # user code **must** await this! |         # mark local actor as "in debug mode" to avoid recurrent | ||||||
|     return _bp() |         # entries/requests to the root process | ||||||
|  |         _in_debug = task_name | ||||||
|  | 
 | ||||||
|  |         # assign unlock callback for debugger teardown hooks | ||||||
|  |         _pdb_release_hook = _pdb_complete.set | ||||||
|  | 
 | ||||||
|  |         # this **must** be awaited by the caller and is done using the | ||||||
|  |         # root nursery so that the debugger can continue to run without | ||||||
|  |         # being restricted by the scope of a new task nursery. | ||||||
|  |         await actor._service_n.start(wait_for_parent_stdin_hijack) | ||||||
|  | 
 | ||||||
|  |     elif is_root_process(): | ||||||
|  | 
 | ||||||
|  |         # we also wait in the root-parent for any child that | ||||||
|  |         # may have the tty locked prior | ||||||
|  |         global _debug_lock | ||||||
|  | 
 | ||||||
|  |         # TODO: wait, what about multiple root tasks acquiring | ||||||
|  |         # it though.. shrug? | ||||||
|  |         # root process (us) already has it; ignore | ||||||
|  |         if _debug_lock._uid == actor.uid: | ||||||
|  |             return | ||||||
|  | 
 | ||||||
|  |         # XXX: since we need to enter pdb synchronously below, | ||||||
|  |         # we have to release the lock manually from pdb completion | ||||||
|  |         # callbacks. Can't think of a nicer way then this atm. | ||||||
|  |         await _debug_lock.acquire() | ||||||
|  | 
 | ||||||
|  |         _debug_lock._uid = actor.uid | ||||||
|  | 
 | ||||||
|  |         # the lock must be released on pdb completion | ||||||
|  |         def teardown(): | ||||||
|  |             global _pdb_complete | ||||||
|  |             global _debug_lock | ||||||
|  | 
 | ||||||
|  |             _debug_lock.release() | ||||||
|  |             _debug_lock._uid = None | ||||||
|  |             _pdb_complete.set() | ||||||
|  | 
 | ||||||
|  |         _pdb_release_hook = teardown | ||||||
|  | 
 | ||||||
|  |     # block here one (at the appropriate frame *up* where | ||||||
|  |     # ``breakpoint()`` was awaited and begin handling stdio | ||||||
|  |     log.debug("Entering the synchronous world of pdb") | ||||||
|  |     debug_func(actor) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _mk_pdb(): | def _mk_pdb(): | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue