Use context for remote debugger locking

A context is the natural fit (vs. a receive stream) for locking the root
proc's tty usage via it's `.started()` sync point. Simplify the
`_breakpoin()` routine to be a simple async func instead of all this
"returning a coroutine" stuff from before we decided that
`tractor.breakpoint()` must be async. Use `runtime` level for locking
logging making it easier to trace.
bi_streaming
Tyler Goodlet 2021-05-10 07:25:55 -04:00
parent d59e9edec8
commit be022b8e2b
1 changed files with 127 additions and 82 deletions

View File

@ -7,7 +7,6 @@ from functools import partial
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator
from async_generator import aclosing
import tractor import tractor
import trio import trio
@ -38,7 +37,9 @@ _pdb_release_hook: Optional[Callable] = None
_in_debug = False _in_debug = False
# lock in root actor preventing multi-access to local tty # lock in root actor preventing multi-access to local tty
_debug_lock = trio.StrictFIFOLock() _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
_debug_lock._uid = None
_pdb_complete: trio.Event = None
# XXX: set by the current task waiting on the root tty lock # XXX: set by the current task waiting on the root tty lock
# and must be cancelled if this actor is cancelled via message # and must be cancelled if this actor is cancelled via message
@ -119,18 +120,21 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
"""Acquire a actor local FIFO lock meant to mutex entry to a local """Acquire a actor local FIFO lock meant to mutex entry to a local
debugger entry point to avoid tty clobbering by multiple processes. debugger entry point to avoid tty clobbering by multiple processes.
""" """
task_name = trio.lowlevel.current_task().name global _debug_lock
try:
log.debug(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}")
await _debug_lock.acquire()
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}") task_name = trio.lowlevel.current_task().name
log.runtime(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}")
async with _debug_lock:
_debug_lock._uid = uid
log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}")
yield yield
finally: _debug_lock._uid = None
_debug_lock.release() log.runtime(f"TTY lock released, remote task: {task_name}:{uid}")
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
# @contextmanager # @contextmanager
@ -144,118 +148,159 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
# signal.signal(signal.SIGINT, prior_handler) # signal.signal(signal.SIGINT, prior_handler)
@tractor.context
async def _hijack_stdin_relay_to_child( async def _hijack_stdin_relay_to_child(
ctx: tractor.context,
subactor_uid: Tuple[str, str] subactor_uid: Tuple[str, str]
) -> AsyncIterator[str]: ) -> AsyncIterator[str]:
global _pdb_complete
task_name = trio.lowlevel.current_task().name
# TODO: when we get to true remote debugging # TODO: when we get to true remote debugging
# this will deliver stdin data # this will deliver stdin data?
log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
log.debug(
"Attempting to acquire TTY lock, "
f"remote task: {task_name}:{subactor_uid}"
)
log.runtime(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
async with _acquire_debug_lock(subactor_uid): async with _acquire_debug_lock(subactor_uid):
log.warning(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
# with _disable_sigint(): with trio.CancelScope(shield=True):
# indicate to child that we've locked stdio # indicate to child that we've locked stdio
yield 'Locked' await ctx.started('Locked')
log.runtime(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
# wait for cancellation of stream by child # wait for unlock pdb by child
# indicating debugger is dis-engaged async with ctx.open_stream() as stream:
await trio.sleep_forever() assert await stream.receive() == 'Unlock'
log.runtime(
f"TTY lock released, remote task: {task_name}:{subactor_uid}")
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock") log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
# XXX: We only make this sync in case someone wants to # XXX: We only make this sync in case someone wants to
# overload the ``breakpoint()`` built-in. # overload the ``breakpoint()`` built-in.
def _breakpoint(debug_func) -> Awaitable[None]: async def _breakpoint(debug_func) -> Awaitable[None]:
"""``tractor`` breakpoint entry for engaging pdb machinery """``tractor`` breakpoint entry for engaging pdb machinery
in subactors. in subactors.
""" """
actor = tractor.current_actor() actor = tractor.current_actor()
do_unlock = trio.Event() task_name = trio.lowlevel.current_task().name
global _pdb_complete
global _pdb_release_hook
global _in_debug
async def wait_for_parent_stdin_hijack( async def wait_for_parent_stdin_hijack(
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
): ):
global _debugger_request_cs global _debugger_request_cs
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
_debugger_request_cs = cs _debugger_request_cs = cs
try: try:
async with get_root() as portal: async with get_root() as portal:
async with portal.open_stream_from(
tractor._debug._hijack_stdin_relay_to_child,
subactor_uid=actor.uid,
) as stream:
# block until first yield above # this syncs to child's ``Context.started()`` call.
async for val in stream: async with portal.open_context(
assert val == 'Locked' tractor._debug._hijack_stdin_relay_to_child,
task_status.started() subactor_uid=actor.uid,
# with trio.CancelScope(shield=True): ) as (ctx, val):
await do_unlock.wait()
assert val == 'Locked'
async with ctx.open_stream() as stream:
# unblock local caller
task_status.started()
await _pdb_complete.wait()
await stream.send('Unlock')
# trigger cancellation of remote stream
break
finally: finally:
log.debug(f"Exiting debugger for actor {actor}") log.debug(f"Exiting debugger for actor {actor}")
global _in_debug global _in_debug
_in_debug = False _in_debug = False
log.debug(f"Child {actor} released parent stdio lock") log.debug(f"Child {actor} released parent stdio lock")
async def _bp(): if not _pdb_complete or _pdb_complete.is_set():
"""Async breakpoint which schedules a parent stdio lock, and once complete _pdb_complete = trio.Event()
enters the ``pdbpp`` debugging console.
"""
task_name = trio.lowlevel.current_task().name
global _in_debug # TODO: need a more robust check for the "root" actor
if actor._parent_chan and not is_root_process():
# TODO: need a more robust check for the "root" actor if _in_debug:
if actor._parent_chan and not is_root_process(): if _in_debug == task_name:
if _in_debug: # this task already has the lock and is
if _in_debug == task_name: # likely recurrently entering a breakpoint
# this task already has the lock and is
# likely recurrently entering a breakpoint
return
# if **this** actor is already in debug mode block here
# waiting for the control to be released - this allows
# support for recursive entries to `tractor.breakpoint()`
log.warning(
f"Actor {actor.uid} already has a debug lock, waiting...")
await do_unlock.wait()
await trio.sleep(0.1)
# assign unlock callback for debugger teardown hooks
global _pdb_release_hook
_pdb_release_hook = do_unlock.set
# mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process
_in_debug = task_name
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery.
await actor._service_n.start(wait_for_parent_stdin_hijack)
elif is_root_process():
# we also wait in the root-parent for any child that
# may have the tty locked prior
if _debug_lock.locked(): # root process already has it; ignore
return return
await _debug_lock.acquire()
_pdb_release_hook = _debug_lock.release
# block here one (at the appropriate frame *up* where # if **this** actor is already in debug mode block here
# ``breakpoint()`` was awaited and begin handling stdio # waiting for the control to be released - this allows
log.debug("Entering the synchronous world of pdb") # support for recursive entries to `tractor.breakpoint()`
debug_func(actor) log.warning(
f"Actor {actor.uid} already has a debug lock, waiting...")
await _pdb_complete.wait()
await trio.sleep(0.1)
# user code **must** await this! # mark local actor as "in debug mode" to avoid recurrent
return _bp() # entries/requests to the root process
_in_debug = task_name
# assign unlock callback for debugger teardown hooks
_pdb_release_hook = _pdb_complete.set
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery.
await actor._service_n.start(wait_for_parent_stdin_hijack)
elif is_root_process():
# we also wait in the root-parent for any child that
# may have the tty locked prior
global _debug_lock
# TODO: wait, what about multiple root tasks acquiring
# it though.. shrug?
# root process (us) already has it; ignore
if _debug_lock._uid == actor.uid:
return
# XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm.
await _debug_lock.acquire()
_debug_lock._uid = actor.uid
# the lock must be released on pdb completion
def teardown():
global _pdb_complete
global _debug_lock
_debug_lock.release()
_debug_lock._uid = None
_pdb_complete.set()
_pdb_release_hook = teardown
# block here one (at the appropriate frame *up* where
# ``breakpoint()`` was awaited and begin handling stdio
log.debug("Entering the synchronous world of pdb")
debug_func(actor)
def _mk_pdb(): def _mk_pdb():