forked from goodboy/tractor
Use context for remote debugger locking
A context is the natural fit (vs. a receive stream) for locking the root proc's tty usage via it's `.started()` sync point. Simplify the `_breakpoin()` routine to be a simple async func instead of all this "returning a coroutine" stuff from before we decided that `tractor.breakpoint()` must be async. Use `runtime` level for locking logging making it easier to trace.windows_bi_streaming
parent
9793851134
commit
f0ceb9a811
|
@ -7,7 +7,6 @@ from functools import partial
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator
|
from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator
|
||||||
|
|
||||||
from async_generator import aclosing
|
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
@ -38,7 +37,9 @@ _pdb_release_hook: Optional[Callable] = None
|
||||||
_in_debug = False
|
_in_debug = False
|
||||||
|
|
||||||
# lock in root actor preventing multi-access to local tty
|
# lock in root actor preventing multi-access to local tty
|
||||||
_debug_lock = trio.StrictFIFOLock()
|
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
|
||||||
|
_debug_lock._uid = None
|
||||||
|
_pdb_complete: trio.Event = None
|
||||||
|
|
||||||
# XXX: set by the current task waiting on the root tty lock
|
# XXX: set by the current task waiting on the root tty lock
|
||||||
# and must be cancelled if this actor is cancelled via message
|
# and must be cancelled if this actor is cancelled via message
|
||||||
|
@ -119,18 +120,21 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
|
||||||
"""Acquire a actor local FIFO lock meant to mutex entry to a local
|
"""Acquire a actor local FIFO lock meant to mutex entry to a local
|
||||||
debugger entry point to avoid tty clobbering by multiple processes.
|
debugger entry point to avoid tty clobbering by multiple processes.
|
||||||
"""
|
"""
|
||||||
task_name = trio.lowlevel.current_task().name
|
global _debug_lock
|
||||||
try:
|
|
||||||
log.debug(
|
|
||||||
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}")
|
|
||||||
await _debug_lock.acquire()
|
|
||||||
|
|
||||||
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
|
log.runtime(
|
||||||
|
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}")
|
||||||
|
|
||||||
|
async with _debug_lock:
|
||||||
|
|
||||||
|
_debug_lock._uid = uid
|
||||||
|
log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}")
|
||||||
yield
|
yield
|
||||||
|
|
||||||
finally:
|
_debug_lock._uid = None
|
||||||
_debug_lock.release()
|
log.runtime(f"TTY lock released, remote task: {task_name}:{uid}")
|
||||||
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
|
|
||||||
|
|
||||||
|
|
||||||
# @contextmanager
|
# @contextmanager
|
||||||
|
@ -144,73 +148,96 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
|
||||||
# signal.signal(signal.SIGINT, prior_handler)
|
# signal.signal(signal.SIGINT, prior_handler)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
async def _hijack_stdin_relay_to_child(
|
async def _hijack_stdin_relay_to_child(
|
||||||
subactor_uid: Tuple[str, str]
|
|
||||||
) -> AsyncIterator[str]:
|
|
||||||
# TODO: when we get to true remote debugging
|
|
||||||
# this will deliver stdin data
|
|
||||||
log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
|
|
||||||
async with _acquire_debug_lock(subactor_uid):
|
|
||||||
log.warning(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
|
|
||||||
|
|
||||||
# with _disable_sigint():
|
ctx: tractor.context,
|
||||||
|
subactor_uid: Tuple[str, str]
|
||||||
|
|
||||||
|
) -> AsyncIterator[str]:
|
||||||
|
|
||||||
|
global _pdb_complete
|
||||||
|
|
||||||
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
|
# TODO: when we get to true remote debugging
|
||||||
|
# this will deliver stdin data?
|
||||||
|
|
||||||
|
log.debug(
|
||||||
|
"Attempting to acquire TTY lock, "
|
||||||
|
f"remote task: {task_name}:{subactor_uid}"
|
||||||
|
)
|
||||||
|
|
||||||
|
log.runtime(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
|
||||||
|
|
||||||
|
async with _acquire_debug_lock(subactor_uid):
|
||||||
|
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
|
||||||
# indicate to child that we've locked stdio
|
# indicate to child that we've locked stdio
|
||||||
yield 'Locked'
|
await ctx.started('Locked')
|
||||||
|
log.runtime(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
|
||||||
|
|
||||||
# wait for cancellation of stream by child
|
# wait for unlock pdb by child
|
||||||
# indicating debugger is dis-engaged
|
async with ctx.open_stream() as stream:
|
||||||
await trio.sleep_forever()
|
assert await stream.receive() == 'Unlock'
|
||||||
|
|
||||||
|
log.runtime(
|
||||||
|
f"TTY lock released, remote task: {task_name}:{subactor_uid}")
|
||||||
|
|
||||||
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
|
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
|
||||||
|
|
||||||
|
|
||||||
# XXX: We only make this sync in case someone wants to
|
# XXX: We only make this sync in case someone wants to
|
||||||
# overload the ``breakpoint()`` built-in.
|
# overload the ``breakpoint()`` built-in.
|
||||||
def _breakpoint(debug_func) -> Awaitable[None]:
|
async def _breakpoint(debug_func) -> Awaitable[None]:
|
||||||
"""``tractor`` breakpoint entry for engaging pdb machinery
|
"""``tractor`` breakpoint entry for engaging pdb machinery
|
||||||
in subactors.
|
in subactors.
|
||||||
"""
|
"""
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
do_unlock = trio.Event()
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
|
global _pdb_complete
|
||||||
|
global _pdb_release_hook
|
||||||
|
global _in_debug
|
||||||
|
|
||||||
async def wait_for_parent_stdin_hijack(
|
async def wait_for_parent_stdin_hijack(
|
||||||
task_status=trio.TASK_STATUS_IGNORED
|
task_status=trio.TASK_STATUS_IGNORED
|
||||||
):
|
):
|
||||||
global _debugger_request_cs
|
global _debugger_request_cs
|
||||||
|
|
||||||
with trio.CancelScope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
_debugger_request_cs = cs
|
_debugger_request_cs = cs
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with get_root() as portal:
|
async with get_root() as portal:
|
||||||
async with portal.open_stream_from(
|
|
||||||
|
# this syncs to child's ``Context.started()`` call.
|
||||||
|
async with portal.open_context(
|
||||||
|
|
||||||
tractor._debug._hijack_stdin_relay_to_child,
|
tractor._debug._hijack_stdin_relay_to_child,
|
||||||
subactor_uid=actor.uid,
|
subactor_uid=actor.uid,
|
||||||
) as stream:
|
|
||||||
|
|
||||||
# block until first yield above
|
) as (ctx, val):
|
||||||
async for val in stream:
|
|
||||||
|
|
||||||
assert val == 'Locked'
|
assert val == 'Locked'
|
||||||
|
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
|
# unblock local caller
|
||||||
task_status.started()
|
task_status.started()
|
||||||
|
|
||||||
# with trio.CancelScope(shield=True):
|
await _pdb_complete.wait()
|
||||||
await do_unlock.wait()
|
await stream.send('Unlock')
|
||||||
|
|
||||||
# trigger cancellation of remote stream
|
|
||||||
break
|
|
||||||
finally:
|
finally:
|
||||||
log.debug(f"Exiting debugger for actor {actor}")
|
log.debug(f"Exiting debugger for actor {actor}")
|
||||||
global _in_debug
|
global _in_debug
|
||||||
_in_debug = False
|
_in_debug = False
|
||||||
log.debug(f"Child {actor} released parent stdio lock")
|
log.debug(f"Child {actor} released parent stdio lock")
|
||||||
|
|
||||||
async def _bp():
|
if not _pdb_complete or _pdb_complete.is_set():
|
||||||
"""Async breakpoint which schedules a parent stdio lock, and once complete
|
_pdb_complete = trio.Event()
|
||||||
enters the ``pdbpp`` debugging console.
|
|
||||||
"""
|
|
||||||
task_name = trio.lowlevel.current_task().name
|
|
||||||
|
|
||||||
global _in_debug
|
|
||||||
|
|
||||||
# TODO: need a more robust check for the "root" actor
|
# TODO: need a more robust check for the "root" actor
|
||||||
if actor._parent_chan and not is_root_process():
|
if actor._parent_chan and not is_root_process():
|
||||||
|
@ -225,38 +252,56 @@ def _breakpoint(debug_func) -> Awaitable[None]:
|
||||||
# support for recursive entries to `tractor.breakpoint()`
|
# support for recursive entries to `tractor.breakpoint()`
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Actor {actor.uid} already has a debug lock, waiting...")
|
f"Actor {actor.uid} already has a debug lock, waiting...")
|
||||||
await do_unlock.wait()
|
await _pdb_complete.wait()
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
# assign unlock callback for debugger teardown hooks
|
|
||||||
global _pdb_release_hook
|
|
||||||
_pdb_release_hook = do_unlock.set
|
|
||||||
|
|
||||||
# mark local actor as "in debug mode" to avoid recurrent
|
# mark local actor as "in debug mode" to avoid recurrent
|
||||||
# entries/requests to the root process
|
# entries/requests to the root process
|
||||||
_in_debug = task_name
|
_in_debug = task_name
|
||||||
|
|
||||||
|
# assign unlock callback for debugger teardown hooks
|
||||||
|
_pdb_release_hook = _pdb_complete.set
|
||||||
|
|
||||||
# this **must** be awaited by the caller and is done using the
|
# this **must** be awaited by the caller and is done using the
|
||||||
# root nursery so that the debugger can continue to run without
|
# root nursery so that the debugger can continue to run without
|
||||||
# being restricted by the scope of a new task nursery.
|
# being restricted by the scope of a new task nursery.
|
||||||
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
||||||
|
|
||||||
elif is_root_process():
|
elif is_root_process():
|
||||||
|
|
||||||
# we also wait in the root-parent for any child that
|
# we also wait in the root-parent for any child that
|
||||||
# may have the tty locked prior
|
# may have the tty locked prior
|
||||||
if _debug_lock.locked(): # root process already has it; ignore
|
global _debug_lock
|
||||||
|
|
||||||
|
# TODO: wait, what about multiple root tasks acquiring
|
||||||
|
# it though.. shrug?
|
||||||
|
# root process (us) already has it; ignore
|
||||||
|
if _debug_lock._uid == actor.uid:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# XXX: since we need to enter pdb synchronously below,
|
||||||
|
# we have to release the lock manually from pdb completion
|
||||||
|
# callbacks. Can't think of a nicer way then this atm.
|
||||||
await _debug_lock.acquire()
|
await _debug_lock.acquire()
|
||||||
_pdb_release_hook = _debug_lock.release
|
|
||||||
|
_debug_lock._uid = actor.uid
|
||||||
|
|
||||||
|
# the lock must be released on pdb completion
|
||||||
|
def teardown():
|
||||||
|
global _pdb_complete
|
||||||
|
global _debug_lock
|
||||||
|
|
||||||
|
_debug_lock.release()
|
||||||
|
_debug_lock._uid = None
|
||||||
|
_pdb_complete.set()
|
||||||
|
|
||||||
|
_pdb_release_hook = teardown
|
||||||
|
|
||||||
# block here one (at the appropriate frame *up* where
|
# block here one (at the appropriate frame *up* where
|
||||||
# ``breakpoint()`` was awaited and begin handling stdio
|
# ``breakpoint()`` was awaited and begin handling stdio
|
||||||
log.debug("Entering the synchronous world of pdb")
|
log.debug("Entering the synchronous world of pdb")
|
||||||
debug_func(actor)
|
debug_func(actor)
|
||||||
|
|
||||||
# user code **must** await this!
|
|
||||||
return _bp()
|
|
||||||
|
|
||||||
|
|
||||||
def _mk_pdb():
|
def _mk_pdb():
|
||||||
# XXX: setting these flags on the pdb instance are absolutely
|
# XXX: setting these flags on the pdb instance are absolutely
|
||||||
|
|
Loading…
Reference in New Issue