From 83f1e79fdd9f37dc47a12f23f33fdaac94af55ba Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 May 2021 07:25:55 -0400 Subject: [PATCH] Use context for remote debugger locking A context is the natural fit (vs. a receive stream) for locking the root proc's tty usage via it's `.started()` sync point. Simplify the `_breakpoin()` routine to be a simple async func instead of all this "returning a coroutine" stuff from before we decided that `tractor.breakpoint()` must be async. Use `runtime` level for locking logging making it easier to trace. --- tractor/_debug.py | 209 ++++++++++++++++++++++++++++------------------ 1 file changed, 127 insertions(+), 82 deletions(-) diff --git a/tractor/_debug.py b/tractor/_debug.py index 75e502a..b7aa5a6 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -7,7 +7,6 @@ from functools import partial from contextlib import asynccontextmanager from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator -from async_generator import aclosing import tractor import trio @@ -38,7 +37,9 @@ _pdb_release_hook: Optional[Callable] = None _in_debug = False # lock in root actor preventing multi-access to local tty -_debug_lock = trio.StrictFIFOLock() +_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() +_debug_lock._uid = None +_pdb_complete: trio.Event = None # XXX: set by the current task waiting on the root tty lock # and must be cancelled if this actor is cancelled via message @@ -119,18 +120,21 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]: """Acquire a actor local FIFO lock meant to mutex entry to a local debugger entry point to avoid tty clobbering by multiple processes. """ - task_name = trio.lowlevel.current_task().name - try: - log.debug( - f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}") - await _debug_lock.acquire() + global _debug_lock - log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}") + task_name = trio.lowlevel.current_task().name + + log.runtime( + f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}") + + async with _debug_lock: + + _debug_lock._uid = uid + log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}") yield - finally: - _debug_lock.release() - log.debug(f"TTY lock released, remote task: {task_name}:{uid}") + _debug_lock._uid = None + log.runtime(f"TTY lock released, remote task: {task_name}:{uid}") # @contextmanager @@ -144,118 +148,159 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]: # signal.signal(signal.SIGINT, prior_handler) +@tractor.context async def _hijack_stdin_relay_to_child( + + ctx: tractor.context, subactor_uid: Tuple[str, str] + ) -> AsyncIterator[str]: + + global _pdb_complete + + task_name = trio.lowlevel.current_task().name + # TODO: when we get to true remote debugging - # this will deliver stdin data - log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock") + # this will deliver stdin data? + + log.debug( + "Attempting to acquire TTY lock, " + f"remote task: {task_name}:{subactor_uid}" + ) + + log.runtime(f"Actor {subactor_uid} is WAITING on stdin hijack lock") + async with _acquire_debug_lock(subactor_uid): - log.warning(f"Actor {subactor_uid} ACQUIRED stdin hijack lock") - # with _disable_sigint(): + with trio.CancelScope(shield=True): - # indicate to child that we've locked stdio - yield 'Locked' + # indicate to child that we've locked stdio + await ctx.started('Locked') + log.runtime(f"Actor {subactor_uid} ACQUIRED stdin hijack lock") - # wait for cancellation of stream by child - # indicating debugger is dis-engaged - await trio.sleep_forever() + # wait for unlock pdb by child + async with ctx.open_stream() as stream: + assert await stream.receive() == 'Unlock' + + log.runtime( + f"TTY lock released, remote task: {task_name}:{subactor_uid}") log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock") # XXX: We only make this sync in case someone wants to # overload the ``breakpoint()`` built-in. -def _breakpoint(debug_func) -> Awaitable[None]: +async def _breakpoint(debug_func) -> Awaitable[None]: """``tractor`` breakpoint entry for engaging pdb machinery in subactors. """ actor = tractor.current_actor() - do_unlock = trio.Event() + task_name = trio.lowlevel.current_task().name + + global _pdb_complete + global _pdb_release_hook + global _in_debug async def wait_for_parent_stdin_hijack( task_status=trio.TASK_STATUS_IGNORED ): global _debugger_request_cs + with trio.CancelScope() as cs: _debugger_request_cs = cs + try: async with get_root() as portal: - async with portal.open_stream_from( - tractor._debug._hijack_stdin_relay_to_child, - subactor_uid=actor.uid, - ) as stream: - # block until first yield above - async for val in stream: + # this syncs to child's ``Context.started()`` call. + async with portal.open_context( - assert val == 'Locked' - task_status.started() + tractor._debug._hijack_stdin_relay_to_child, + subactor_uid=actor.uid, - # with trio.CancelScope(shield=True): - await do_unlock.wait() + ) as (ctx, val): + + assert val == 'Locked' + + async with ctx.open_stream() as stream: + + # unblock local caller + task_status.started() + + await _pdb_complete.wait() + await stream.send('Unlock') - # trigger cancellation of remote stream - break finally: log.debug(f"Exiting debugger for actor {actor}") global _in_debug _in_debug = False log.debug(f"Child {actor} released parent stdio lock") - async def _bp(): - """Async breakpoint which schedules a parent stdio lock, and once complete - enters the ``pdbpp`` debugging console. - """ - task_name = trio.lowlevel.current_task().name + if not _pdb_complete or _pdb_complete.is_set(): + _pdb_complete = trio.Event() - global _in_debug - - # TODO: need a more robust check for the "root" actor - if actor._parent_chan and not is_root_process(): - if _in_debug: - if _in_debug == task_name: - # this task already has the lock and is - # likely recurrently entering a breakpoint - return - - # if **this** actor is already in debug mode block here - # waiting for the control to be released - this allows - # support for recursive entries to `tractor.breakpoint()` - log.warning( - f"Actor {actor.uid} already has a debug lock, waiting...") - await do_unlock.wait() - await trio.sleep(0.1) - - # assign unlock callback for debugger teardown hooks - global _pdb_release_hook - _pdb_release_hook = do_unlock.set - - # mark local actor as "in debug mode" to avoid recurrent - # entries/requests to the root process - _in_debug = task_name - - # this **must** be awaited by the caller and is done using the - # root nursery so that the debugger can continue to run without - # being restricted by the scope of a new task nursery. - await actor._service_n.start(wait_for_parent_stdin_hijack) - - elif is_root_process(): - # we also wait in the root-parent for any child that - # may have the tty locked prior - if _debug_lock.locked(): # root process already has it; ignore + # TODO: need a more robust check for the "root" actor + if actor._parent_chan and not is_root_process(): + if _in_debug: + if _in_debug == task_name: + # this task already has the lock and is + # likely recurrently entering a breakpoint return - await _debug_lock.acquire() - _pdb_release_hook = _debug_lock.release - # block here one (at the appropriate frame *up* where - # ``breakpoint()`` was awaited and begin handling stdio - log.debug("Entering the synchronous world of pdb") - debug_func(actor) + # if **this** actor is already in debug mode block here + # waiting for the control to be released - this allows + # support for recursive entries to `tractor.breakpoint()` + log.warning( + f"Actor {actor.uid} already has a debug lock, waiting...") + await _pdb_complete.wait() + await trio.sleep(0.1) - # user code **must** await this! - return _bp() + # mark local actor as "in debug mode" to avoid recurrent + # entries/requests to the root process + _in_debug = task_name + + # assign unlock callback for debugger teardown hooks + _pdb_release_hook = _pdb_complete.set + + # this **must** be awaited by the caller and is done using the + # root nursery so that the debugger can continue to run without + # being restricted by the scope of a new task nursery. + await actor._service_n.start(wait_for_parent_stdin_hijack) + + elif is_root_process(): + + # we also wait in the root-parent for any child that + # may have the tty locked prior + global _debug_lock + + # TODO: wait, what about multiple root tasks acquiring + # it though.. shrug? + # root process (us) already has it; ignore + if _debug_lock._uid == actor.uid: + return + + # XXX: since we need to enter pdb synchronously below, + # we have to release the lock manually from pdb completion + # callbacks. Can't think of a nicer way then this atm. + await _debug_lock.acquire() + + _debug_lock._uid = actor.uid + + # the lock must be released on pdb completion + def teardown(): + global _pdb_complete + global _debug_lock + + _debug_lock.release() + _debug_lock._uid = None + _pdb_complete.set() + + _pdb_release_hook = teardown + + # block here one (at the appropriate frame *up* where + # ``breakpoint()`` was awaited and begin handling stdio + log.debug("Entering the synchronous world of pdb") + debug_func(actor) def _mk_pdb():